Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a new condition before launching the self._scan_stale_dags() fu… #39159

Open
wants to merge 46 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
598667a
added a new condition before launching the self._scan_stale_dags() fu…
tchakib Apr 22, 2024
d361f9a
Merge branch 'main' into main
tchakib Apr 22, 2024
08ad4d9
Update unit_tests.cfg
tchakib Apr 22, 2024
1b6d59b
Update config.yml
tchakib Apr 22, 2024
32ef994
Update dags.rst
tchakib Apr 22, 2024
b8df88e
Update dags.rst
tchakib Apr 22, 2024
deee1de
Update airflow/config_templates/config.yml
tchakib Apr 22, 2024
0a0af37
Update airflow/dag_processing/manager.py
tchakib Apr 22, 2024
c4d49a3
Update airflow/config_templates/config.yml
tchakib Apr 23, 2024
8bb60da
Update config.yml
tchakib Apr 23, 2024
511252b
Update unit_tests.cfg
tchakib Apr 23, 2024
ff816a0
Update manager.py
tchakib Apr 23, 2024
ea5113f
Update dags.rst
tchakib Apr 23, 2024
f6fa572
Update config.yml
tchakib Apr 23, 2024
24327ab
Update config.yml
tchakib Apr 23, 2024
e0b338b
Update unit_tests.cfg
tchakib Apr 23, 2024
78d6a58
Update manager.py
tchakib Apr 23, 2024
dc33dcd
Update dags.rst
tchakib Apr 23, 2024
727f452
Update dags.rst
tchakib Apr 23, 2024
db2b496
Merge branch 'main' into main
tchakib Apr 23, 2024
e5c86ed
Merge branch 'main' into main
tchakib Apr 24, 2024
2ce0c3a
Merge branch 'main' into main
tchakib Apr 24, 2024
493fba5
Update config.yml
tchakib Apr 24, 2024
2b76552
Update docs/apache-airflow/core-concepts/dags.rst
tchakib Apr 25, 2024
8a1ab56
Update manager.py
tchakib Apr 25, 2024
7ae8830
Update unit_tests.cfg
tchakib Apr 25, 2024
cf3cacd
rename scan_stale_dags
Apr 25, 2024
31dd4a5
Merge pull request #1 from tchakib/scan_stale_dags
tchakib Apr 25, 2024
61d09d2
add unitest for enable_purging_stale_dags condition
Apr 25, 2024
6573bf1
Merge pull request #2 from tchakib/scan_stale_dags
tchakib Apr 25, 2024
4d79865
Merge branch 'main' into main
tchakib Apr 25, 2024
fcf86c9
Update airflow/config_templates/config.yml
tchakib Apr 26, 2024
b6e5640
fix doc and variables
Apr 26, 2024
c70da66
Merge pull request #3 from tchakib/scan_stale_dags
tchakib Apr 26, 2024
ce411fb
Merge branch 'apache:main' into main
tchakib Apr 26, 2024
3e4fdcc
Merge branch 'main' into main
tchakib Apr 26, 2024
122577a
Merge branch 'main' into main
tchakib Apr 29, 2024
60f9b2f
Update dags.rst
tchakib Apr 29, 2024
dfd9707
Merge branch 'main' into main
tchakib May 30, 2024
866a7d2
Merge branch 'main' into main
tchakib May 30, 2024
f34b42e
Merge branch 'main' into main
tchakib May 30, 2024
a8994fa
fix pep8 test_job_runner.py
tchakib Jun 1, 2024
793381c
Merge branch 'main' into main
tchakib Jun 3, 2024
969ab81
Update dags.rst
tchakib Jun 3, 2024
6484f55
Merge branch 'main' into main
tchakib Jun 3, 2024
520e660
Update test_job_runner.py
tchakib Jun 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,14 @@ core:
type: integer
example: ~
default: "4096"
purge_stale_dags:
description: |
Stale dags are deleted by default, set this to False
if you want to keep them.
version_added: 2.10.0
type: boolean
example: ~
default: "True"
database:
description: ~
options:
Expand Down
8 changes: 5 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -476,8 +476,8 @@ def start(self):

return self._run_parsing_loop()

def _scan_stale_dags(self):
"""Scan at fix internal DAGs which are no longer present in files."""
def _purge_stale_dags(self):
"""Scan at purge internal DAGs which are no longer present in files."""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
Expand Down Expand Up @@ -600,7 +600,9 @@ def _run_parsing_loop(self):

if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._scan_stale_dags()

if conf.getboolean("core", "purge_stale_dags", fallback=True):
self._purge_stale_dags()
DagWarning.purge_inactive_dag_warnings()
refreshed_dag_dir = self._refresh_dag_dir()

Expand Down
18 changes: 18 additions & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,21 @@ if it fails for ``N`` number of times consecutively.
we can also provide and override these configuration from DAG argument:

- ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.

Disable deletion of stale DAGs
tchakib marked this conversation as resolved.
Show resolved Hide resolved
------------------------------

A small clarification about obsolete DAGs: For the DAGs to become obsolete,
they must be deleted from the DAG file. And it is at this point that the purge_stale_dags function will carry out the purge of these DAGs in the database.

A use case where we want to keep obsolete DAGs:

In a versioned DAG context, it may be necessary to run two versions of DAG in parallel in two versions of workers.

It can also be useful to keep obsolete DAGs if they are still being used on the version (n-1) of the worker.

However, as the scheduler will not have the DAGs of the version (n-1), it will automatically launch the purge of non-existing DAGs in the DAG file.

To keep obsolete DAGs, we can set the value of the ``AIRFLOW__CORE__PURGE_STALE_DAGS`` variable to ``False``.

By default, it is set to ``True`` in ``config.yml``.
149 changes: 145 additions & 4 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def test_file_paths_in_queue_sorted_by_priority(
)
assert session_delete.call_args[0][0].fileloc == parsing_request.fileloc

def test_scan_stale_dags(self):
def test_purge_stale_dags(self):
"""
Ensure that DAGs are marked inactive when the file is parsed but the
DagModel.last_parsed_time is not updated.
Expand Down Expand Up @@ -669,7 +669,7 @@ def test_scan_stale_dags(self):
)
assert serialized_dag_count == 1

manager.processor._scan_stale_dags()
manager.processor._purge_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
Expand All @@ -692,7 +692,7 @@ def test_scan_stale_dags(self):
("scheduler", "stale_dag_threshold"): "50",
}
)
def test_scan_stale_dags_standalone_mode(self):
def test_purge_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
Expand Down Expand Up @@ -740,7 +740,7 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 2

manager.processor._scan_stale_dags()
manager.processor._purge_stale_dags()

active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1
Expand Down Expand Up @@ -1410,6 +1410,147 @@ def test_callback_queue(self, tmp_path):
dag1_req2,
]

@conf_vars({("core", "purge_stale_dags"): "False"})
@conf_vars({("core", "load_examples"): "False"})
def test_disable_purge_stale_dags(self):
"""
Test disable purge_stale_dags.
"""
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
dagbag = DagBag(test_dag_path, read_dags_from_db=False, include_examples=False)

assert not conf.getboolean("core", "purge_stale_dags")

with create_session() as session:
# Add stale DAG to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
SerializedDagModel.write_dag(dag)

# Add DAG to the file_parsing_stats
stat = DagFileStat(
num_dags=1,
import_errors=0,
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=timedelta(seconds=1),
run_count=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

manager.processor._run_parsing_loop()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

@conf_vars({("core", "load_examples"): "False"})
def test_enable_purge_stale_dags(self):
"""
Test enable purge_stale_dags with the default variable.
"""
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
dagbag = DagBag(test_dag_path, read_dags_from_db=False, include_examples=False)

assert conf.getboolean("core", "purge_stale_dags")

with create_session() as session:
# Add stale DAG to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
SerializedDagModel.write_dag(dag)

# Add DAG to the file_parsing_stats
stat = DagFileStat(
num_dags=1,
import_errors=0,
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=timedelta(seconds=1),
run_count=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

manager.processor._run_parsing_loop()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 0

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 0


def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent: DagFileProcessorAgent):
start_timer = time.monotonic()
Expand Down