Skip to content

Commit

Permalink
Merge branch 'main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
tchakib committed Apr 29, 2024
2 parents 3e4fdcc + 28a240a commit 122577a
Show file tree
Hide file tree
Showing 29 changed files with 999 additions and 294 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,9 @@ Apache Airflow is tested with:

| | Main version (dev) | Stable version (2.9.0) |
|-------------|----------------------------|-----------------------------|
| Python | 3.8, 3.9, 3.10, 3.11, 3.12 | 3.8, 3.9, 3.10, 3.11 |
| Python | 3.8, 3.9, 3.10, 3.11, 3.12 | 3.8, 3.9, 3.10, 3.11, 3.12 |
| Platform | AMD64/ARM64(\*) | AMD64/ARM64(\*) |
| Kubernetes | 1.26, 1.27, 1.28, 1.29 | 1.25, 1.26, 1.27, 1.28, 1.29|
| Kubernetes | 1.26, 1.27, 1.28, 1.29 | 1.26, 1.27, 1.28, 1.29 |
| PostgreSQL | 12, 13, 14, 15, 16 | 12, 13, 14, 15, 16 |
| MySQL | 8.0, Innovation | 8.0, Innovation |
| SQLite | 3.15.0+ | 3.15.0+ |
Expand Down
8 changes: 8 additions & 0 deletions RELEASE_NOTES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ Xcom table column ``value`` type has changed from ``blob`` to ``longblob``. This

To downgrade from revision: ``b4078ac230a1``, ensure that you don't have Xcom values larger than 65,535 bytes. Otherwise, you'll need to clean those rows or run ``airflow db clean xcom`` to clean the Xcom table.

Stronger validation for key parameter defaults in taskflow context variables (#38015)
"""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""""

As for the taskflow implementation in conjunction with context variable defaults invalid parameter orders can be
generated, it is now not accepted anymore (and validated) that taskflow functions are defined with defaults
other than ``None``. If you have done this before you most likely will see a broken DAG and a error message like
``Error message: Context key parameter my_param can't have a default other than None``.

New Features
""""""""""""
- Allow users to write dag_id and task_id in their national characters, added display name for dag / task (v2) (#38446)
Expand Down
10 changes: 4 additions & 6 deletions airflow/auth/managers/base_auth_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -398,12 +398,10 @@ def filter_permitted_menu_items(self, menu_items: list[MenuItem]) -> list[MenuIt
accessible_items = []
for menu_item in items:
menu_item_copy = MenuItem(
name=menu_item.name,
icon=menu_item.icon,
label=menu_item.label,
childs=[],
baseview=menu_item.baseview,
cond=menu_item.cond,
**{
**menu_item.__dict__,
"childs": [],
}
)
if menu_item.childs:
accessible_children = []
Expand Down
6 changes: 4 additions & 2 deletions airflow/models/taskinstance.py
Original file line number Diff line number Diff line change
Expand Up @@ -1250,8 +1250,9 @@ def _log_state(*, task_instance: TaskInstance | TaskInstancePydantic, lead_msg:
str(task_instance.state).upper(),
task_instance.dag_id,
task_instance.task_id,
task_instance.run_id,
]
message = "%sMarking task as %s. dag_id=%s, task_id=%s, "
message = "%sMarking task as %s. dag_id=%s, task_id=%s, run_id=%s, "
if task_instance.map_index >= 0:
params.append(task_instance.map_index)
message += "map_index=%d, "
Expand Down Expand Up @@ -2558,9 +2559,10 @@ def _run_raw_task(
raise
self.defer_task(defer=defer, session=session)
self.log.info(
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, execution_date=%s, start_date=%s",
"Pausing task as DEFERRED. dag_id=%s, task_id=%s, run_id=%s, execution_date=%s, start_date=%s",
self.dag_id,
self.task_id,
self.run_id,
_date_or_empty(task_instance=self, attr="execution_date"),
_date_or_empty(task_instance=self, attr="start_date"),
)
Expand Down
62 changes: 37 additions & 25 deletions airflow/operators/trigger_dagrun.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,15 @@
import datetime
import json
import time
import warnings
from typing import TYPE_CHECKING, Any, Sequence, cast

from sqlalchemy import select
from sqlalchemy.orm.exc import NoResultFound

from airflow.api.common.trigger_dag import trigger_dag
from airflow.configuration import conf
from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists
from airflow.exceptions import AirflowException, DagNotFound, DagRunAlreadyExists, RemovedInAirflow3Warning
from airflow.models.baseoperator import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink
from airflow.models.dag import DagModel
Expand All @@ -41,7 +42,7 @@
from airflow.utils.state import DagRunState
from airflow.utils.types import DagRunType

XCOM_EXECUTION_DATE_ISO = "trigger_execution_date_iso"
XCOM_LOGICAL_DATE_ISO = "trigger_logical_date_iso"
XCOM_RUN_ID = "trigger_run_id"


Expand All @@ -64,7 +65,7 @@ class TriggerDagRunLink(BaseOperatorLink):
def get_link(self, operator: BaseOperator, *, ti_key: TaskInstanceKey) -> str:
# Fetch the correct execution date for the triggerED dag which is
# stored in xcom during execution of the triggerING task.
when = XCom.get_value(ti_key=ti_key, key=XCOM_EXECUTION_DATE_ISO)
when = XCom.get_value(ti_key=ti_key, key=XCOM_LOGICAL_DATE_ISO)
query = {"dag_id": cast(TriggerDagRunOperator, operator).trigger_dag_id, "base_date": when}
return build_airflow_url_with_query(query)

Expand All @@ -77,7 +78,7 @@ class TriggerDagRunOperator(BaseOperator):
:param trigger_run_id: The run ID to use for the triggered DAG run (templated).
If not provided, a run ID will be automatically generated.
:param conf: Configuration for the DAG run (templated).
:param execution_date: Execution date for the dag (templated).
:param logical_date: Logical date for the dag (templated).
:param reset_dag_run: Whether clear existing dag run if already exists.
This is useful when backfill or rerun an existing dag run.
This only resets (not recreates) the dag run.
Expand All @@ -91,12 +92,13 @@ class TriggerDagRunOperator(BaseOperator):
:param failed_states: List of failed or dis-allowed states, default is ``None``.
:param deferrable: If waiting for completion, whether or not to defer the task until done,
default is ``False``.
:param execution_date: Deprecated parameter; same as ``logical_date``.
"""

template_fields: Sequence[str] = (
"trigger_dag_id",
"trigger_run_id",
"execution_date",
"logical_date",
"conf",
"wait_for_completion",
)
Expand All @@ -110,13 +112,14 @@ def __init__(
trigger_dag_id: str,
trigger_run_id: str | None = None,
conf: dict | None = None,
execution_date: str | datetime.datetime | None = None,
logical_date: str | datetime.datetime | None = None,
reset_dag_run: bool = False,
wait_for_completion: bool = False,
poke_interval: int = 60,
allowed_states: list[str] | None = None,
failed_states: list[str] | None = None,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
execution_date: str | datetime.datetime | None = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -136,20 +139,29 @@ def __init__(
self.failed_states = [DagRunState.FAILED]
self._defer = deferrable

if execution_date is not None and not isinstance(execution_date, (str, datetime.datetime)):
if execution_date is not None:
warnings.warn(
"Parameter 'execution_date' is deprecated. Use 'logical_date' instead.",
RemovedInAirflow3Warning,
stacklevel=2,
)
logical_date = execution_date

if logical_date is not None and not isinstance(logical_date, (str, datetime.datetime)):
type_name = type(logical_date).__name__
raise TypeError(
f"Expected str or datetime.datetime type for execution_date.Got {type(execution_date)}"
f"Expected str or datetime.datetime type for parameter 'logical_date'. Got {type_name}"
)

self.execution_date = execution_date
self.logical_date = logical_date

def execute(self, context: Context):
if isinstance(self.execution_date, datetime.datetime):
parsed_execution_date = self.execution_date
elif isinstance(self.execution_date, str):
parsed_execution_date = timezone.parse(self.execution_date)
if isinstance(self.logical_date, datetime.datetime):
parsed_logical_date = self.logical_date
elif isinstance(self.logical_date, str):
parsed_logical_date = timezone.parse(self.logical_date)
else:
parsed_execution_date = timezone.utcnow()
parsed_logical_date = timezone.utcnow()

try:
json.dumps(self.conf)
Expand All @@ -159,20 +171,20 @@ def execute(self, context: Context):
if self.trigger_run_id:
run_id = str(self.trigger_run_id)
else:
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_execution_date)
run_id = DagRun.generate_run_id(DagRunType.MANUAL, parsed_logical_date)

try:
dag_run = trigger_dag(
dag_id=self.trigger_dag_id,
run_id=run_id,
conf=self.conf,
execution_date=parsed_execution_date,
execution_date=parsed_logical_date,
replace_microseconds=False,
)

except DagRunAlreadyExists as e:
if self.reset_dag_run:
self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_execution_date)
self.log.info("Clearing %s on %s", self.trigger_dag_id, parsed_logical_date)

# Get target dag object and call clear()
dag_model = DagModel.get_current(self.trigger_dag_id)
Expand All @@ -182,15 +194,15 @@ def execute(self, context: Context):
dag_bag = DagBag(dag_folder=dag_model.fileloc, read_dags_from_db=True)
dag = dag_bag.get_dag(self.trigger_dag_id)
dag_run = e.dag_run
dag.clear(start_date=dag_run.execution_date, end_date=dag_run.execution_date)
dag.clear(start_date=dag_run.logical_date, end_date=dag_run.logical_date)
else:
raise e
if dag_run is None:
raise RuntimeError("The dag_run should be set here!")
# Store the execution date from the dag run (either created or found above) to
# be used when creating the extra link on the webserver.
ti = context["task_instance"]
ti.xcom_push(key=XCOM_EXECUTION_DATE_ISO, value=dag_run.execution_date.isoformat())
ti.xcom_push(key=XCOM_LOGICAL_DATE_ISO, value=dag_run.logical_date.isoformat())
ti.xcom_push(key=XCOM_RUN_ID, value=dag_run.run_id)

if self.wait_for_completion:
Expand All @@ -200,7 +212,7 @@ def execute(self, context: Context):
trigger=DagStateTrigger(
dag_id=self.trigger_dag_id,
states=self.allowed_states + self.failed_states,
execution_dates=[parsed_execution_date],
execution_dates=[parsed_logical_date],
poll_interval=self.poke_interval,
),
method_name="execute_complete",
Expand All @@ -210,7 +222,7 @@ def execute(self, context: Context):
self.log.info(
"Waiting for %s on %s to become allowed state %s ...",
self.trigger_dag_id,
dag_run.execution_date,
dag_run.logical_date,
self.allowed_states,
)
time.sleep(self.poke_interval)
Expand All @@ -225,17 +237,17 @@ def execute(self, context: Context):

@provide_session
def execute_complete(self, context: Context, session: Session, event: tuple[str, dict[str, Any]]):
# This execution date is parsed from the return trigger event
provided_execution_date = event[1]["execution_dates"][0]
# This logical_date is parsed from the return trigger event
provided_logical_date = event[1]["execution_dates"][0]
try:
dag_run = session.execute(
select(DagRun).where(
DagRun.dag_id == self.trigger_dag_id, DagRun.execution_date == provided_execution_date
DagRun.dag_id == self.trigger_dag_id, DagRun.execution_date == provided_logical_date
)
).scalar_one()
except NoResultFound:
raise AirflowException(
f"No DAG run found for DAG {self.trigger_dag_id} and execution date {self.execution_date}"
f"No DAG run found for DAG {self.trigger_dag_id} and logical date {self.logical_date}"
)

state = dag_run.state
Expand Down
10 changes: 8 additions & 2 deletions airflow/providers/amazon/aws/sensors/emr.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@
from deprecated import deprecated

from airflow.configuration import conf
from airflow.exceptions import AirflowException, AirflowProviderDeprecationWarning, AirflowSkipException
from airflow.exceptions import (
AirflowException,
AirflowProviderDeprecationWarning,
AirflowSkipException,
)
from airflow.providers.amazon.aws.hooks.emr import EmrContainerHook, EmrHook, EmrServerlessHook
from airflow.providers.amazon.aws.links.emr import EmrClusterLink, EmrLogsLink, get_log_uri
from airflow.providers.amazon.aws.triggers.emr import (
Expand Down Expand Up @@ -231,7 +235,9 @@ def poke(self, context: Context) -> bool:

if state in EmrServerlessHook.APPLICATION_FAILURE_STATES:
# TODO: remove this if check when min_airflow_version is set to higher than 2.7.1
failure_message = f"EMR Serverless job failed: {self.failure_message_from_response(response)}"
failure_message = (
f"EMR Serverless application failed: {self.failure_message_from_response(response)}"
)
if self.soft_fail:
raise AirflowSkipException(failure_message)
raise AirflowException(failure_message)
Expand Down
2 changes: 1 addition & 1 deletion airflow/providers/apache/livy/operators/livy.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ class LivyOperator(BaseOperator):
depends on the option that's being modified.
:param extra_headers: A dictionary of headers passed to the HTTP request to livy.
:param retry_args: Arguments which define the retry behaviour.
See Tenacity documentation at https://github.com/jd/tenacity
:param deferrable: Run operator in the deferrable mode
See Tenacity documentation at https://github.com/jd/tenacity
"""

template_fields: Sequence[str] = ("spark_params",)
Expand Down

0 comments on commit 122577a

Please sign in to comment.