Skip to content

Commit

Permalink
fix: Allow DAG deploys without variables.json (#91)
Browse files Browse the repository at this point in the history
* fix: factor out gsutil cmd to a function

* fix: factor out gcloud composer vars import to a function

* skip importing Airflow vars when variables file not found

* tests to ensure deployment works without vars file

* modified README on when to skip variable setup
  • Loading branch information
adlersantos committed Jun 17, 2021
1 parent b22a78a commit 8eaaae9
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 22 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -147,15 +147,15 @@ Docker images will be built and pushed to GCR by default whenever the command ab

## 5. Declare and set your pipeline variables

Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run.
Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any.

All variables used by a dataset must have their values set in
If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file

```
[.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json
```

Airflow variables use JSON dot notation to access the variable's value. For example, if you're using the following variables in your pipeline config:
Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:

- `{{ var.json.shared.composer_bucket }}`
- `{{ var.json.parent.nested }}`
Expand Down
56 changes: 37 additions & 19 deletions scripts/deploy_dag.py
Expand Up @@ -17,6 +17,7 @@
import pathlib
import subprocess
import typing
import warnings

CURRENT_PATH = pathlib.Path(__file__).resolve().parent
PROJECT_ROOT = CURRENT_PATH.parent
Expand Down Expand Up @@ -67,6 +68,10 @@ def main(
)


def run_gsutil_cmd(args: typing.List[str], cwd: pathlib.Path):
subprocess.check_call(["gsutil"] + args, cwd=cwd)


def copy_variables_to_airflow_data_folder(
local: bool,
env_path: pathlib.Path,
Expand All @@ -80,7 +85,9 @@ def copy_variables_to_airflow_data_folder(
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

check_existence_of_variables_file(cwd / filename)
if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
return

if local:
"""
Expand All @@ -106,7 +113,31 @@ def copy_variables_to_airflow_data_folder(
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
subprocess.check_call(["gsutil", "cp", filename, gcs_uri], cwd=cwd)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)


def run_cloud_composer_vars_import(
composer_env: str,
composer_region: str,
airflow_path: pathlib.Path,
cwd: pathlib.Path,
):
subprocess.check_call(
[
"gcloud",
"composer",
"environments",
"run",
str(composer_env),
"--location",
str(composer_region),
"variables",
"--",
"--import",
str(airflow_path),
],
cwd=cwd,
)


def import_variables_to_airflow_env(
Expand Down Expand Up @@ -136,21 +167,8 @@ def import_variables_to_airflow_env(
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
subprocess.check_call(
[
"gcloud",
"composer",
"environments",
"run",
str(composer_env),
"--location",
str(composer_region),
"variables",
"--",
"--import",
str(airflow_path),
],
cwd=cwd,
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)


Expand Down Expand Up @@ -189,7 +207,7 @@ def copy_generated_dag_to_airflow_dags_folder(
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target}\n"
)
subprocess.check_call(["gsutil", "cp", filename, target], cwd=cwd)
run_gsutil_cmd(["cp", filename, target], cwd=cwd)


def copy_custom_callables_to_airflow_dags_folder(
Expand Down Expand Up @@ -231,7 +249,7 @@ def copy_custom_callables_to_airflow_dags_folder(
f" Source:\n {cwd / 'custom'}\n\n"
f" Destination:\n {target}\n"
)
subprocess.check_call(["gsutil", "cp", "-r", "custom", target], cwd=cwd)
run_gsutil_cmd(["cp", "-r", "custom", target], cwd=cwd)


def check_existence_of_variables_file(file_path: pathlib.Path):
Expand Down
35 changes: 35 additions & 0 deletions tests/scripts/test_deploy_dag.py
Expand Up @@ -175,6 +175,41 @@ def test_script_always_requires_dataset_arg(
pipeline_path_2 = pipeline_path


def test_script_can_deploy_without_variables_file(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
airflow_home: pathlib.Path,
env: str,
mocker,
):
setup_dag_and_variables(
dataset_path,
pipeline_path,
airflow_home,
env,
f"{dataset_path.name}_variables.json",
)

# Delete the variables file
vars_file = f"{dataset_path.name}_variables.json"
(ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink()
assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists()

mocker.patch("scripts.deploy_dag.run_gsutil_cmd")
mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import")

deploy_dag.main(
local=False,
env_path=ENV_PATH,
dataset_id=dataset_path.name,
pipeline=pipeline_path.name,
airflow_home=airflow_home,
composer_env="test-env",
composer_bucket="test-bucket",
composer_region="test-region",
)


def test_script_with_pipeline_arg_deploys_only_that_pipeline(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
Expand Down

0 comments on commit 8eaaae9

Please sign in to comment.