Skip to content

Commit

Permalink
feat: Use a single file for shared Airflow variables (#122)
Browse files Browse the repository at this point in the history
* generate shared_variables file

* test shared_variables file exists

* modified deploy_dag to also import shared vars

* modified README for shared vars usage

* fixed Python linter errors
  • Loading branch information
adlersantos committed Jul 22, 2021
1 parent d587689 commit f5d227d
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 59 deletions.
31 changes: 24 additions & 7 deletions README.md
Expand Up @@ -145,17 +145,37 @@ datasets

Docker images will be built and pushed to GCR by default whenever the command above is run. To skip building and pushing images, use the optional `--skip-builds` flag.

## 5. Declare and set your pipeline variables
## 5. Declare and set your Airflow variables

Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any.
Running the command in the previous step will parse your pipeline config and inform you about the Airflow variables that your pipeline expects to use and must be defined.

If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file
If your pipeline doesn't use any Airflow variables, you can skip this step.

There are two types of variables that pipelines can use: **shared** and **dataset-specific**. Shared variables are those that can be reused by other pipelines in the same Airflow or Cloud Composer environment. These are variables that stay constant from pipeline to pipeline. Examples of shared variables include your Cloud Composer environment name and bucket, your GCP project ID, and paths to the Airflow DAG and data folders. To prevent duplication, specify your shared variables in one place:

```
[.dev|.test]/datasets/shared_variables.json
```

and inside the file, nest the variables under a common parent key. For example:

```
{
"shared": {
"composer_name": "test-pipelines-abcde1234",
"composer_bucket": "us-east4-test-pipelines-abcde1234-bucket",
"airflow_data_folder": "/home/airflow/gcs/data"
}
}
```

For dataset-specific variables, create the following file

```
[.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json
```

Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:
In general, pipelines use the JSON dot notation to access Airflow variables. Make sure to define and nest your variables under some parent key when writing to the JSON files above. We recommend using your dataset's name as the parent key, to mimic the same structure as the folder hierarchy. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:

- `{{ var.json.shared.composer_bucket }}`
- `{{ var.json.parent.nested }}`
Expand All @@ -165,9 +185,6 @@ then your variables JSON file should look like this

```json
{
"shared": {
"composer_bucket": "us-east4-test-pipelines-abcde1234-bucket"
},
"parent": {
"nested": "some value",
"another_nested": "another value"
Expand Down
97 changes: 50 additions & 47 deletions scripts/deploy_dag.py
Expand Up @@ -80,40 +80,42 @@ def copy_variables_to_airflow_data_folder(
composer_bucket: str = None,
):
"""
cd .{ENV}/{DATASET_ID}
cd .{ENV}/datasets or .{ENV}/datasets/{dataset_id}
"""
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
return

if local:
"""
cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{DATASET_ID}_variables.json
"""
target_path = airflow_home / "data" / "variables" / filename
target_path.mkdir(parents=True, exist_ok=True)
print(
"\nCopying variables JSON file into Airflow data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target_path}\n"
)
for cwd, filename in (
(env_path / "datasets", "shared_variables.json"),
(env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"),
):

if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
continue

if local:
"""
cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{filename}
"""
target_path = airflow_home / "data" / "variables" / filename
target_path.mkdir(parents=True, exist_ok=True)
print(
"\nCopying variables JSON file into Airflow data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target_path}\n"
)

subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd)
else:
"""
[remote]
gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{DATASET_ID}_variables.json...
"""
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
print(
"\nCopying variables JSON file into Cloud Composer data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)
subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd)
else:
"""
[remote]
gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{filename}...
"""
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
print(
"\nCopying variables JSON file into Cloud Composer data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)


def run_cloud_composer_vars_import(
Expand Down Expand Up @@ -155,21 +157,22 @@ def import_variables_to_airflow_env(
[remote]
gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- --import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json
"""
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

if local:
print(f"\nImporting Airflow variables from {cwd / filename}...\n")
subprocess.check_call(
["airflow", "variables", "import", str(cwd / filename)], cwd=cwd
)
else:
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)
for cwd, filename in (
(env_path / "datasets", "shared_variables.json"),
(env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"),
):
if local:
print(f"\nImporting Airflow variables from {cwd / filename}...\n")
subprocess.check_call(
["airflow", "variables", "import", str(cwd / filename)], cwd=cwd
)
else:
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)


def copy_generated_dag_to_airflow_dags_folder(
Expand Down
8 changes: 8 additions & 0 deletions scripts/generate_dag.py
Expand Up @@ -70,6 +70,8 @@ def main(
else:
generate_pipeline_dag(dataset_id, pipeline_id, env)

generate_shared_variables_file(env)


def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
pipeline_dir = DATASETS_PATH / dataset_id / pipeline_id
Expand Down Expand Up @@ -144,6 +146,12 @@ def generate_task_contents(task: dict) -> str:
)


def generate_shared_variables_file(env: str) -> None:
pathlib.Path(
PROJECT_ROOT / f".{env}" / "datasets" / "shared_variables.json"
).touch()


def dag_init(config: dict) -> dict:
return config["dag"].get("initialize") or config["dag"].get("init")

Expand Down
18 changes: 13 additions & 5 deletions tests/scripts/test_deploy_dag.py
Expand Up @@ -175,7 +175,7 @@ def test_script_always_requires_dataset_arg(
pipeline_path_2 = pipeline_path


def test_script_can_deploy_without_variables_file(
def test_script_can_deploy_without_variables_files(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
airflow_home: pathlib.Path,
Expand All @@ -190,10 +190,17 @@ def test_script_can_deploy_without_variables_file(
f"{dataset_path.name}_variables.json",
)

# Delete the variables file
vars_file = f"{dataset_path.name}_variables.json"
(ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink()
assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists()
# Delete the shared variables file
(ENV_DATASETS_PATH / "shared_variables.json").unlink()
assert not (ENV_DATASETS_PATH / "shared_variables.json").exists()

# Delete the dataset-specific variables file
(
ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json"
).unlink()
assert not (
ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json"
).exists()

mocker.patch("scripts.deploy_dag.run_gsutil_cmd")
mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import")
Expand Down Expand Up @@ -382,6 +389,7 @@ def test_script_with_local_flag_copies_files_to_local_airflow_env(
composer_region=None,
)

assert (airflow_home / "data" / "variables" / "shared_variables.json").exists()
assert (airflow_home / "data" / "variables" / variables_filename).exists()
assert (airflow_home / "dags" / dag_filename).exists()

Expand Down
13 changes: 13 additions & 0 deletions tests/scripts/test_generate_dag.py
Expand Up @@ -127,6 +127,19 @@ def test_main_copies_custom_dir_if_it_exists(
assert (path_prefix / "custom").is_dir()


def test_main_creates_shared_variables_file(
dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)
custom_path = dataset_path / pipeline_path.name / "custom"
custom_path.mkdir(parents=True, exist_ok=True)

generate_dag.main(dataset_path.name, pipeline_path.name, env)

assert (ENV_DATASETS_PATH / "shared_variables.json").exists()
assert not (ENV_DATASETS_PATH / "shared_variables.json").is_dir()


def test_main_only_depends_on_pipeline_yaml(
dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str
):
Expand Down

0 comments on commit f5d227d

Please sign in to comment.