Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cache TaskGroup/DAG regardless of the load_method #926

Open
5 tasks
tatiana opened this issue Apr 29, 2024 · 4 comments · May be fixed by #992
Open
5 tasks

Cache TaskGroup/DAG regardless of the load_method #926

tatiana opened this issue Apr 29, 2024 · 4 comments · May be fixed by #992
Assignees
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes area:performance Related to performance, like memory usage, CPU usage, speed, etc parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing
Milestone

Comments

@tatiana
Copy link
Collaborator

tatiana commented Apr 29, 2024

Context

PR #904 introduced the first caching implementation in Cosmos on top of the previously implemented support for partial parsing (#800).

Although these changes contributed significantly to the performance of Cosmos, this approach does not improve the DAG parsing performance for methods other than dbt ls. In those cases, Cosmos will still:

  • parse the project, manifest, or dbt ls command
  • select nodes based on selectors given by the user

During DAG parsing - which happens at the Airflow DAG Processor and is executed every time a task is scheduled/run.

This task aims to extend the cache support to allow the cache of the complete DAG/TaskGroup, without the need to do the two previously described steps.

Acceptance criteria

  • Create a function to evaluate if there were no changes to the dbt project or to the Airflow DAG
  • Allow users to define their own caching function
  • Support caching the DAG/TaskGroup into a pickle file
  • Use the caching function to decide if Cosmos should load the DAG/TaskGroup from the cache or if the DAG/TaskGroup should be recreated (saving the new cache on disk)
  • Run performance test when using load_method=LoadMode.DBT_MANIFEST
@tatiana tatiana self-assigned this Apr 29, 2024
@tatiana tatiana added area:performance Related to performance, like memory usage, CPU usage, speed, etc parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc labels Apr 29, 2024
@dosubot dosubot bot added the area:parsing Related to parsing DAG/DBT improvement, issues, or fixes label Apr 29, 2024
@tatiana tatiana added this to the 1.5.0 milestone Apr 30, 2024
tatiana added a commit that referenced this issue May 1, 2024
…ct (#904)

Improve the performance to run the benchmark DAG with 100 tasks by 34%
and the benchmark DAG with 10 tasks by 22%, by persisting the dbt
partial parse artifact in Airflow nodes. This performance can be even
higher in the case of dbt projects that take more time to be parsed.

With the introduction of #800, Cosmos supports using dbt partial parsing
files. This feature has led to a substantial performance improvement,
particularly for large dbt projects, both during Airflow DAG parsing
(using LoadMode.DBT_LS) and also Airflow task execution (when using
`ExecutionMode.LOCAL` and `ExecutionMode.VIRTUALENV`).

There were two limitations with the initial support to partial parsing,
which the current PR aims to address:

1. DAGs using Cosmos `ProfileMapping` classes could not leverage this
feature. This is because the partial parsing relies on profile files not
changing, and by default, Cosmos would mock the dbt profile in several
parts of the code. The consequence is that users trying Cosmos 1.4.0a1
will see the following message:
```
13:33:16  Unable to do partial parsing because profile has changed
13:33:16  Unable to do partial parsing because env vars used in profiles.yml have changed
```

2. The user had to explicitly provide a `partial_parse.msgpack` file in
the original project folder for their Airflow deployment - and if, for
any reason, this became outdated, the user would not leverage the
partial parsing feature. Since Cosmos runs dbt tasks from within a
temporary directory, the partial parse would be stale for some users, it
would be updated in the temporary directory, but the next time the task
was run, Cosmos/dbt would not leverage the recently updated
`partial_parse.msgpack` file.

The current PR addresses these two issues respectfully by:

1. Allowing users that want to leverage Cosmos `ProfileMapping` and
partial parsing to use `RenderConfig(enable_mock_profile=False)`

2. Introducing a Cosmos cache directory where we are persisting partial
parsing files. This feature is enabled by default, but users can opt out
by setting the Airflow configuration `[cosmos][enable_cache] = False`
(exporting the environment variable `AIRFLOW__COSMOS__ENABLE_CACHE=0`).
Users can also define the temporary directory used to store these files
using the `[cosmos][cache_dir]` Airflow configuration. By default,
Cosmos will create and use a folder `cosmos` inside the system's
temporary directory:
https://docs.python.org/3/library/tempfile.html#tempfile.gettempdir .

This PR affects both DAG parsing and task execution. Although it does
not introduce an optimisation per se, it makes the partial parse feature
implemented #800 available to more users.

Closes: #722

I updated the documentation in the PR: #898

Some future steps related to optimization associated to caching to be
addressed in separate PRs:
i. Change how we create mocked profiles, to create the file itself in
the same way, referencing an environment variable with the same name -
and only changing the value of the environment variable (#924)
ii. Extend caching to the `profiles.yml` created by Cosmos in the newly
introduced `tmp/cosmos` without the need to recreate it every time
(#925).
iii. Extend caching to the Airflow DAG/Task group as a pickle file -
this approach is more generic and would work for every type of DAG
parsing and executor. (#926)
iv. Support persisting/fetching the cache from remote storage so we
don't have to replicate it for every Airflow scheduler and worker node.
(#927)
v. Cache dbt deps lock file/avoid installing dbt steps every time. We
can leverage `package-lock.yml` introduced in dbt t 1.7
(https://docs.getdbt.com/reference/commands/deps#predictable-package-installs),
but ideally, we'd have a strategy to support older versions of dbt as
well. (#930)
vi. Support caching `partial_parse.msgpack` even when vars change:
https://medium.com/@sebastian.daum89/how-to-speed-up-single-dbt-invocations-when-using-changing-dbt-variables-b9d91ce3fb0d
vii. Support partial parsing in Docker and Kubernetes Cosmos executors
(#929)
viii. Centralise all the Airflow-based config into Cosmos settings.py &
create a dedicated docs page containing information about these (#928)

**How to validate this change**

Run the performance benchmark against this and the `main` branch,
checking the value of `/tmp/performance_results.txt`.

Example of commands run locally:

```
# Setup
AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:postgres@0.0.0.0:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance-setup

# Run test for 100 dbt models per DAG:
MODEL_COUNT=100 AIRFLOW_HOME=`pwd` AIRFLOW_CONN_AIRFLOW_DB="postgres://postgres:postgres@0.0.0.0:5432/postgres" PYTHONPATH=`pwd` AIRFLOW_HOME=`pwd` AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=20000 AIRFLOW__CORE__DAG_FILE_PROCESSOR_TIMEOUT=20000 hatch run tests.py3.11-2.7:test-performance
```

An example of output when running 100 with the main branch:
```
NUM_MODELS=100
TIME=114.18614888191223
MODELS_PER_SECOND=0.8757629623135543
DBT_VERSION=1.7.13
```

And with the current PR:
```
NUM_MODELS=100
TIME=75.17766404151917
MODELS_PER_SECOND=1.33018232576064
DBT_VERSION=1.7.13
```
@tatiana tatiana added triage-needed Items need to be reviewed / assigned to milestone and removed triage-needed Items need to be reviewed / assigned to milestone labels May 17, 2024
@tatiana tatiana changed the title Cache Cosmos DAG/TaskGroup regarless of the load_method of choice Improve performance regardless of the load_method of choice May 17, 2024
@tatiana tatiana changed the title Improve performance regardless of the load_method of choice Improve performance regardless of the load_method May 17, 2024
@tatiana tatiana changed the title Improve performance regardless of the load_method Cache TaskGroup/DAG regardless of the load_method May 17, 2024
@tatiana tatiana removed the epic label May 17, 2024
@tatiana
Copy link
Collaborator Author

tatiana commented May 21, 2024

Progress can be seen in draft PR: #992

@tatiana
Copy link
Collaborator Author

tatiana commented May 23, 2024

I explored and validated a few possible ways of quickly generating the "version" of the cache. Some are safer than others, the idea was to get a sense of cost/benefit:

"""
How to run script:

pip install checksumdir  # https://pypi.org/project/checksumdir/
python hash_comparison.py --path your/dbt/project

python cosmos/hash_comparison.py --path  dags/dbt/perf
Comparing hash methods pm a directory with 5009 files
- create_hash_with_checksumdir_md5        : 0.2171382334
- create_hash_with_checksumdir_sha1       : 0.1730406
- create_hash_with_checksumdir_sha256     : 0.17227659160000003
- create_hash_with_dir_size               : 0.05859905000000003
- create_hash_with_file_changes           : 0.03936836659999994
- create_hash_with_most_recently_changed  : 0.10243044999999995
- create_hash_with_sql_changes            : 0.03878682500000004
- create_hash_with_subprocess_du          : 0.014803133400000057
- create_hash_with_walk_files_md5         : 0.14606525
- create_hash_with_walk_files_size        : 0.024191108199999967
"""
import argparse
import hashlib
import inspect
import os
import subprocess
import sys
import timeit
from pathlib import Path

import checksumdir


ROOT_DIR = Path(".")
ROOT_DIR = Path("./dags/dbt/jaffle_shop")
REPETITIONS = 5


def create_hash_with_dir_size(dir_path: Path) -> int:
    return sum(f.stat().st_size for f in dir_path.glob('**/*') if f.is_file())


def create_hash_with_most_recently_changed(dir_path: Path) -> str:
    value = subprocess.run(
        f'find {dir_path} -type f -print0 | xargs -0 stat -f "%m %N" | sort -rn | head -1',
        text=True,
        stdout=subprocess.PIPE,
        check=True,
        shell=True
    )
    return value

def create_hash_with_subprocess_du(dir_path: Path) -> str:
    value = subprocess.run(
        f"du -s {dir_path}",
        text=True,
        stdout=subprocess.PIPE,
        check=True,
        shell=True
    )
    return value.stdout.strip()


def create_hash_with_walk_files_size(dir_path: Path):
    return sum(
        sum(
            os.path.getsize(os.path.join(walk_result[0], element))
            for element in walk_result[2]
        )
        for walk_result in os.walk(dir_path)
   )


def create_hash_with_walk_files_md5(dir_path: Path) -> str:
    hasher = hashlib.md5()
    for walk_result in os.walk(dir_path):
        for element in walk_result[2]:
            filepath = os.path.join(walk_result[0], element)
            with open(str(filepath), 'rb') as fp:
                buf = fp.read()
                hasher.update(buf)
    return hasher.hexdigest()


def create_hash_with_file_changes(dir_path: Path) -> float:
    return sum([path.stat().st_mtime for path in dir_path.glob('**/*')])


def create_hash_with_sql_changes(dir_path: Path) -> float:
    return sum([path.stat().st_mtime for path in dir_path.glob('**/*.sql')])


create_hash_with_checksumdir_md5 = lambda dir_path: checksumdir.dirhash(str(dir_path), 'md5')
create_hash_with_checksumdir_sha1 = lambda dir_path: checksumdir.dirhash(str(dir_path), 'sha1')
create_hash_with_checksumdir_sha256 = lambda dir_path: checksumdir.dirhash(str(dir_path), 'sha256')


if __name__ == "__main__":
    parser = argparse.ArgumentParser("hash_comparison")
    parser.add_argument("--path", help=".", type=Path, default=ROOT_DIR)
    args = parser.parse_args()

    total_files = len([path for path in args.path.glob('**/*') if path.is_file()])
    print(f"Comparing hash methods pm a directory with {total_files} files")


    current_module = sys.modules[__name__]
    functions_to_test = inspect.getmembers(current_module, inspect.isfunction)
    for function_name, function in functions_to_test:
        average_time = timeit.timeit(
            lambda: function(args.path),
            number=REPETITIONS
        ) / REPETITIONS
        print(f"- {function_name: <40}: {average_time}")

So far, it seems create_hash_with_file_changes is one of the most accurate and light weight approaches. I strongly believe we should allow users to customise the method used to calculate the version, but this may be a good default for Cosmos.

@tatiana
Copy link
Collaborator Author

tatiana commented May 23, 2024

I extended the initial implementation to:

  • Version the DbtDag cache in 0db7121
  • Extend the caching mechanism to DbtTaskGroup in daad0b3

I also updated the PR description with what is missing:

  • Allow users to customise their own calculate_current_version
  • Refactor so there isn't so much code duplicate between DbtDag and TaskGroup
  • Allow users to purge the cache (via env var or param passed in the Airflow UI - second option is probably preferable)
  • Unify _create_cache_identifier and create_cache_identifier_v2
  • Write tests

@tatiana
Copy link
Collaborator Author

tatiana commented May 29, 2024

When using the current approach in a distributed environment, there are two challenges:

  1. Concurrent tasks (in the same node) try to create the cache at the same time
  2. Tasks running in different nodes (that didn't have the cache) have to generate it

We'll look into improving this.

Examples of the behaviour in a distributed Airflow environment:

Image
Image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:parsing Related to parsing DAG/DBT improvement, issues, or fixes area:performance Related to performance, like memory usage, CPU usage, speed, etc parsing:custom Related to custom parsing, like custom DAG parsing, custom DBT parsing, etc parsing:dbt_manifest Issues, questions, or features related to dbt_manifest parsing
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant