Skip to content

Commit

Permalink
feat: Support deploying a single pipeline in a dataset (#46)
Browse files Browse the repository at this point in the history
  • Loading branch information
adlersantos committed Jun 2, 2021
1 parent 9e01936 commit 8bdb8d7
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 1 deletion.
3 changes: 3 additions & 0 deletions README.md
Expand Up @@ -183,12 +183,15 @@ Deploy the DAG and the variables to your own Cloud Composer environment using on
```
$ python scripts/deploy_dag.py \
--dataset DATASET \
[--pipeline PIPELINE] \
--composer-env CLOUD_COMPOSER_ENVIRONMENT_NAME \
--composer-bucket CLOUD_COMPOSER_BUCKET \
--composer-region CLOUD_COMPOSER_REGION \
--env ENV
```

The specifying an argument to `--pipeline` is optional. By default, the script deploys all pipelines under the given `--dataset` argument.

# Testing

Run the unit tests from the project root as follows:
Expand Down
9 changes: 8 additions & 1 deletion scripts/deploy_dag.py
Expand Up @@ -27,6 +27,7 @@ def main(
local: bool,
env_path: pathlib.Path,
dataset_id: str,
pipeline: str = None,
airflow_home: pathlib.Path = None,
composer_env: str = None,
composer_bucket: str = None,
Expand All @@ -41,7 +42,12 @@ def main(
)

print("========== AIRFLOW DAGS ==========")
for pipeline_path in list_subdirs(env_path / "datasets" / dataset_id):
if pipeline:
pipelines = [env_path / "datasets" / pipeline]
else:
pipelines = list_subdirs(env_path / "datasets" / dataset_id)

for pipeline_path in pipelines:
copy_custom_callables_to_airflow_dags_folder(
local,
env_path,
Expand Down Expand Up @@ -326,6 +332,7 @@ def list_subdirs(path: pathlib.Path) -> typing.List[pathlib.Path]:
local=args.local,
env_path=PROJECT_ROOT / f".{args.env}",
dataset_id=args.dataset,
pipeline=args.pipeline,
airflow_home=airflow_path,
composer_env=args.composer_env,
composer_bucket=args.composer_bucket,
Expand Down
45 changes: 45 additions & 0 deletions tests/scripts/test_deploy_dag.py
Expand Up @@ -172,6 +172,51 @@ def test_script_always_requires_dataset_arg(
)


pipeline_path_2 = pipeline_path


def test_script_with_pipeline_arg_deploys_only_that_pipeline(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
pipeline_path_2: pathlib.Path,
airflow_home: pathlib.Path,
env: str,
):
setup_dag_and_variables(
dataset_path,
pipeline_path,
airflow_home,
env,
f"{dataset_path.name}_variables.json",
)

setup_dag_and_variables(
dataset_path,
pipeline_path_2,
airflow_home,
env,
f"{dataset_path.name}_variables.json",
)

deploy_dag.main(
local=True,
env_path=ENV_PATH,
dataset_id=dataset_path.name,
pipeline=pipeline_path_2.name,
airflow_home=airflow_home,
composer_env=None,
composer_bucket=None,
composer_region=None,
)

assert not (
airflow_home / "dags" / f"{dataset_path.name}__{pipeline_path.name}_dag.py"
).exists()
assert (
airflow_home / "dags" / f"{dataset_path.name}__{pipeline_path_2.name}_dag.py"
).exists()


def test_script_without_local_flag_requires_cloud_composer_args(env: str):
with pytest.raises(subprocess.CalledProcessError):
# No --composer-env parameter
Expand Down

0 comments on commit 8bdb8d7

Please sign in to comment.