Params Fail to Evaluate In Dag Body #39692
-
Apache Airflow versionOther Airflow 2 version (please specify below) If "Other Airflow 2 version" selected, which one?2.7.2 What happened?When providing airflow params, the parameter is not available in the body of the airflow dag. What you think should happen instead?The value should be available in the airflow dag How to reproducefrom airflow import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
with DAG("generate_password_dag", params={"test": Param("defeault", type="string")}) as dag:
sql_operator = SQLExecuteQueryOperator(
task_id="sql_operator",
sql="sql/{{ params.test }}.sql",
database="default",
) Operating SystemNA Versions of Apache Airflow ProvidersNA DeploymentOfficial Apache Airflow Helm Chart Deployment detailsNA Anything else?NA Are you willing to submit PR?
Code of Conduct
|
Beta Was this translation helpful? Give feedback.
Replies: 7 comments
-
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval. |
Beta Was this translation helpful? Give feedback.
-
This rendered because you have |
Beta Was this translation helpful? Give feedback.
-
Thanks for your response @Taragolis! For my use-case, I'd like to determine the SQL file with a param provided in the UI |
Beta Was this translation helpful? Give feedback.
-
You can use Jinja's For example, using this SQL file; # dags/sql/default.sql
SELECT 'from a templated file'; and this task definition: from airflow.models.dag import DAG
from airflow.models.param import Param
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
with DAG(
"generate_password_dag",
params={"test": Param("default", type="string")},
):
sql_operator = SQLExecuteQueryOperator(
task_id="sql_operator",
sql="{% include 'sql/' ~ params.test ~ '.sql' %}",
conn_id="postgres",
) In the above task definition, @oliver-helix Can you try that approach and see if it works please? |
Beta Was this translation helpful? Give feedback.
-
Thank you @josh-fell! This indeed works. Curious why |
Beta Was this translation helpful? Give feedback.
-
It doesn't work by design, see: #39651 (comment)
It is just a Jinja statement which rendered |
Beta Was this translation helpful? Give feedback.
-
Thank you two for helping earlier. I've reached another road block and was hoping you two could help again 😅 Struggling to render joined params. In the following example, I've tried two ways, using your earlier Error:
or
Code to reproduce the problem: from datetime import datetime
from airflow.models import Param, DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
with DAG(
dag_id="my_dag",
start_date=datetime(2022, 6, 1),
params={
"site": Param("mysite", type="string"),
"submission": Param("mysubmission", type="string"),
},
) as dag:
name = "{{ params.site }}_{{ params.submission }}"
# name = "{% include params.site ~ '_' ~ params.submission %}"
task = PostgresOperator(
task_id="rename_previous_submission_schemas",
postgres_conn_id="my-conn-id",
sql="{% include 'sql/' ~ params.site ~ '.sql' %}",
params={"name": name},
) |
Beta Was this translation helpful? Give feedback.
You can use Jinja's
include
tag to render a file's contents.For example, using this SQL file;
and this task definition:
In the above task definition,
~
is string concatenation in Jinja.Task log:
@oliver-helix Can you try…