From 8bdb8d797beaa1f44e0fd6c93864474cd535ab36 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Wed, 2 Jun 2021 01:52:51 -0400 Subject: [PATCH] feat: Support deploying a single pipeline in a dataset (#46) --- README.md | 3 +++ scripts/deploy_dag.py | 9 ++++++- tests/scripts/test_deploy_dag.py | 45 ++++++++++++++++++++++++++++++++ 3 files changed, 56 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 1a9d4f4db..466b0bb35 100644 --- a/README.md +++ b/README.md @@ -183,12 +183,15 @@ Deploy the DAG and the variables to your own Cloud Composer environment using on ``` $ python scripts/deploy_dag.py \ --dataset DATASET \ + [--pipeline PIPELINE] \ --composer-env CLOUD_COMPOSER_ENVIRONMENT_NAME \ --composer-bucket CLOUD_COMPOSER_BUCKET \ --composer-region CLOUD_COMPOSER_REGION \ --env ENV ``` +The specifying an argument to `--pipeline` is optional. By default, the script deploys all pipelines under the given `--dataset` argument. + # Testing Run the unit tests from the project root as follows: diff --git a/scripts/deploy_dag.py b/scripts/deploy_dag.py index e641f8bd6..a8bef11e3 100644 --- a/scripts/deploy_dag.py +++ b/scripts/deploy_dag.py @@ -27,6 +27,7 @@ def main( local: bool, env_path: pathlib.Path, dataset_id: str, + pipeline: str = None, airflow_home: pathlib.Path = None, composer_env: str = None, composer_bucket: str = None, @@ -41,7 +42,12 @@ def main( ) print("========== AIRFLOW DAGS ==========") - for pipeline_path in list_subdirs(env_path / "datasets" / dataset_id): + if pipeline: + pipelines = [env_path / "datasets" / pipeline] + else: + pipelines = list_subdirs(env_path / "datasets" / dataset_id) + + for pipeline_path in pipelines: copy_custom_callables_to_airflow_dags_folder( local, env_path, @@ -326,6 +332,7 @@ def list_subdirs(path: pathlib.Path) -> typing.List[pathlib.Path]: local=args.local, env_path=PROJECT_ROOT / f".{args.env}", dataset_id=args.dataset, + pipeline=args.pipeline, airflow_home=airflow_path, composer_env=args.composer_env, composer_bucket=args.composer_bucket, diff --git a/tests/scripts/test_deploy_dag.py b/tests/scripts/test_deploy_dag.py index 15b1633bd..bd702b101 100644 --- a/tests/scripts/test_deploy_dag.py +++ b/tests/scripts/test_deploy_dag.py @@ -172,6 +172,51 @@ def test_script_always_requires_dataset_arg( ) +pipeline_path_2 = pipeline_path + + +def test_script_with_pipeline_arg_deploys_only_that_pipeline( + dataset_path: pathlib.Path, + pipeline_path: pathlib.Path, + pipeline_path_2: pathlib.Path, + airflow_home: pathlib.Path, + env: str, +): + setup_dag_and_variables( + dataset_path, + pipeline_path, + airflow_home, + env, + f"{dataset_path.name}_variables.json", + ) + + setup_dag_and_variables( + dataset_path, + pipeline_path_2, + airflow_home, + env, + f"{dataset_path.name}_variables.json", + ) + + deploy_dag.main( + local=True, + env_path=ENV_PATH, + dataset_id=dataset_path.name, + pipeline=pipeline_path_2.name, + airflow_home=airflow_home, + composer_env=None, + composer_bucket=None, + composer_region=None, + ) + + assert not ( + airflow_home / "dags" / f"{dataset_path.name}__{pipeline_path.name}_dag.py" + ).exists() + assert ( + airflow_home / "dags" / f"{dataset_path.name}__{pipeline_path_2.name}_dag.py" + ).exists() + + def test_script_without_local_flag_requires_cloud_composer_args(env: str): with pytest.raises(subprocess.CalledProcessError): # No --composer-env parameter