Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added a new condition before launching the self._scan_stale_dags() fu… #39159

Open
wants to merge 46 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
598667a
added a new condition before launching the self._scan_stale_dags() fu…
tchakib Apr 22, 2024
d361f9a
Merge branch 'main' into main
tchakib Apr 22, 2024
08ad4d9
Update unit_tests.cfg
tchakib Apr 22, 2024
1b6d59b
Update config.yml
tchakib Apr 22, 2024
32ef994
Update dags.rst
tchakib Apr 22, 2024
b8df88e
Update dags.rst
tchakib Apr 22, 2024
deee1de
Update airflow/config_templates/config.yml
tchakib Apr 22, 2024
0a0af37
Update airflow/dag_processing/manager.py
tchakib Apr 22, 2024
c4d49a3
Update airflow/config_templates/config.yml
tchakib Apr 23, 2024
8bb60da
Update config.yml
tchakib Apr 23, 2024
511252b
Update unit_tests.cfg
tchakib Apr 23, 2024
ff816a0
Update manager.py
tchakib Apr 23, 2024
ea5113f
Update dags.rst
tchakib Apr 23, 2024
f6fa572
Update config.yml
tchakib Apr 23, 2024
24327ab
Update config.yml
tchakib Apr 23, 2024
e0b338b
Update unit_tests.cfg
tchakib Apr 23, 2024
78d6a58
Update manager.py
tchakib Apr 23, 2024
dc33dcd
Update dags.rst
tchakib Apr 23, 2024
727f452
Update dags.rst
tchakib Apr 23, 2024
db2b496
Merge branch 'main' into main
tchakib Apr 23, 2024
e5c86ed
Merge branch 'main' into main
tchakib Apr 24, 2024
2ce0c3a
Merge branch 'main' into main
tchakib Apr 24, 2024
493fba5
Update config.yml
tchakib Apr 24, 2024
2b76552
Update docs/apache-airflow/core-concepts/dags.rst
tchakib Apr 25, 2024
8a1ab56
Update manager.py
tchakib Apr 25, 2024
7ae8830
Update unit_tests.cfg
tchakib Apr 25, 2024
cf3cacd
rename scan_stale_dags
Apr 25, 2024
31dd4a5
Merge pull request #1 from tchakib/scan_stale_dags
tchakib Apr 25, 2024
61d09d2
add unitest for enable_purging_stale_dags condition
Apr 25, 2024
6573bf1
Merge pull request #2 from tchakib/scan_stale_dags
tchakib Apr 25, 2024
4d79865
Merge branch 'main' into main
tchakib Apr 25, 2024
fcf86c9
Update airflow/config_templates/config.yml
tchakib Apr 26, 2024
b6e5640
fix doc and variables
Apr 26, 2024
c70da66
Merge pull request #3 from tchakib/scan_stale_dags
tchakib Apr 26, 2024
ce411fb
Merge branch 'apache:main' into main
tchakib Apr 26, 2024
3e4fdcc
Merge branch 'main' into main
tchakib Apr 26, 2024
122577a
Merge branch 'main' into main
tchakib Apr 29, 2024
60f9b2f
Update dags.rst
tchakib Apr 29, 2024
dfd9707
Merge branch 'main' into main
tchakib May 30, 2024
866a7d2
Merge branch 'main' into main
tchakib May 30, 2024
f34b42e
Merge branch 'main' into main
tchakib May 30, 2024
a8994fa
fix pep8 test_job_runner.py
tchakib Jun 1, 2024
793381c
Merge branch 'main' into main
tchakib Jun 3, 2024
969ab81
Update dags.rst
tchakib Jun 3, 2024
6484f55
Merge branch 'main' into main
tchakib Jun 3, 2024
520e660
Update test_job_runner.py
tchakib Jun 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 8 additions & 0 deletions airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -500,6 +500,14 @@ core:
type: integer
example: ~
default: "4096"
enable_purging_stale_dags:
uranusjr marked this conversation as resolved.
Show resolved Hide resolved
description: |
Stale dags are deleted by default, [core] enable_purging_stale_dags is False
if you want to keep them.
tchakib marked this conversation as resolved.
Show resolved Hide resolved
version_added: 2.10.0
type: boolean
example: ~
default: "True"
database:
description: ~
options:
Expand Down
8 changes: 5 additions & 3 deletions airflow/dag_processing/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,8 +475,8 @@ def start(self):

return self._run_parsing_loop()

def _scan_stale_dags(self):
"""Scan at fix internal DAGs which are no longer present in files."""
def _purge_stale_dags(self):
"""Scan at purge internal DAGs which are no longer present in files."""
now = timezone.utcnow()
elapsed_time_since_refresh = (now - self.last_deactivate_stale_dags_time).total_seconds()
if elapsed_time_since_refresh > self.parsing_cleanup_interval:
Expand Down Expand Up @@ -599,7 +599,9 @@ def _run_parsing_loop(self):

if self.standalone_dag_processor:
self._fetch_callbacks(max_callbacks_per_loop)
self._scan_stale_dags()

if conf.getboolean("core", "enable_purging_stale_dags", fallback=True):
self._purge_stale_dags()
DagWarning.purge_inactive_dag_warnings()
refreshed_dag_dir = self._refresh_dag_dir()

Expand Down
7 changes: 7 additions & 0 deletions docs/apache-airflow/core-concepts/dags.rst
Original file line number Diff line number Diff line change
Expand Up @@ -924,3 +924,10 @@ if it fails for ``N`` number of times consecutively.
we can also provide and override these configuration from DAG argument:

- ``max_consecutive_failed_dag_runs``: Overrides :ref:`config:core__max_consecutive_failed_dag_runs_per_dag`.

Disable deletion of stale DAGs
tchakib marked this conversation as resolved.
Show resolved Hide resolved
------------------------------

In a versioned DAG context, there may be a need to run two versions of DAGs in parallel in two versions of workers.
It can also be useful to keep the allowed DAGs if they are still in progress in the worker (n -1).
To keep the stale DAGs, you can change the value of the variable ``AIRFLOW__CORE__ENABLE_PURGING_STALE_DAGS`` to ``False``. By default, it is set to ``True`` in config.yml.
150 changes: 145 additions & 5 deletions tests/dag_processing/test_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -576,7 +576,7 @@ def test_recently_modified_file_is_parsed_with_mtime_mode(
> (freezed_base_time - manager.processor.get_last_finish_time("file_1.py")).total_seconds()
)

def test_scan_stale_dags(self):
def test_purge_stale_dags(self):
"""
Ensure that DAGs are marked inactive when the file is parsed but the
DagModel.last_parsed_time is not updated.
Expand Down Expand Up @@ -629,7 +629,7 @@ def test_scan_stale_dags(self):
)
assert serialized_dag_count == 1

manager.processor._scan_stale_dags()
manager.processor._purge_stale_dags()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
Expand All @@ -652,7 +652,7 @@ def test_scan_stale_dags(self):
("scheduler", "stale_dag_threshold"): "50",
}
)
def test_scan_stale_dags_standalone_mode(self):
def test_purge_stale_dags_standalone_mode(self):
"""
Ensure only dags from current dag_directory are updated
"""
Expand Down Expand Up @@ -700,7 +700,7 @@ def test_scan_stale_dags_standalone_mode(self):
active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 2

manager.processor._scan_stale_dags()
manager.processor._purge_stale_dags()

active_dag_count = session.query(func.count(DagModel.dag_id)).filter(DagModel.is_active).scalar()
assert active_dag_count == 1
Expand Down Expand Up @@ -1370,6 +1370,146 @@ def test_callback_queue(self, tmp_path):
dag1_req2,
]

@conf_vars({("core", "enable_purging_stale_dags"): "False"})
@conf_vars({("core", "load_examples"): "False"})
def test_disable_purge_stale_dags(self):
"""
Test disable purge_stale_dags.
"""
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
dagbag = DagBag(test_dag_path, read_dags_from_db=False, include_examples=False)

assert conf.getboolean("core", "enable_purging_stale_dags") == False

with create_session() as session:
# Add stale DAG to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
SerializedDagModel.write_dag(dag)

# Add DAG to the file_parsing_stats
stat = DagFileStat(
num_dags=1,
import_errors=0,
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=timedelta(seconds=1),
run_count=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

manager.processor._run_parsing_loop()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

@conf_vars({("core", "load_examples"): "False"})
def test_enable_purge_stale_dags(self):
"""
Test enable purge_stale_dags with the default variable.
"""
manager = DagProcessorJobRunner(
job=Job(),
processor=DagFileProcessorManager(
dag_directory=TEST_DAG_FOLDER,
max_runs=1,
processor_timeout=timedelta(minutes=10),
dag_ids=[],
pickle_dags=False,
async_mode=True,
),
)

test_dag_path = str(TEST_DAG_FOLDER / "test_example_bash_operator.py")
dagbag = DagBag(test_dag_path, read_dags_from_db=False, include_examples=False)

assert conf.getboolean("core", "enable_purging_stale_dags") == True

with create_session() as session:
# Add stale DAG to the DB
dag = dagbag.get_dag("test_example_bash_operator")
dag.last_parsed_time = timezone.utcnow()
dag.sync_to_db()
SerializedDagModel.write_dag(dag)

# Add DAG to the file_parsing_stats
stat = DagFileStat(
num_dags=1,
import_errors=0,
last_finish_time=timezone.utcnow() + timedelta(hours=1),
last_duration=timedelta(seconds=1),
run_count=1,
)
manager.processor._file_paths = [test_dag_path]
manager.processor._file_stats[test_dag_path] = stat

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 1

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 1

manager.processor._run_parsing_loop()

active_dag_count = (
session.query(func.count(DagModel.dag_id))
.filter(DagModel.is_active, DagModel.fileloc == test_dag_path)
.scalar()
)
assert active_dag_count == 0

serialized_dag_count = (
session.query(func.count(SerializedDagModel.dag_id))
.filter(SerializedDagModel.fileloc == test_dag_path)
.scalar()
)
assert serialized_dag_count == 0

def _wait_for_processor_agent_to_complete_in_async_mode(processor_agent: DagFileProcessorAgent):
start_timer = time.monotonic()
Expand Down Expand Up @@ -1680,4 +1820,4 @@ def test_not_log_to_stdout(self, capfd):

# Capture the stdout and stderr
out, _ = capfd.readouterr()
assert "DAG File Processing Stats" not in out
assert "DAG File Processing Stats" not in out