From f5d227de7d30439e80346612856723292c6f46e7 Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 22 Jul 2021 15:35:01 -0400 Subject: [PATCH] feat: Use a single file for shared Airflow variables (#122) * generate shared_variables file * test shared_variables file exists * modified deploy_dag to also import shared vars * modified README for shared vars usage * fixed Python linter errors --- README.md | 31 +++++++--- scripts/deploy_dag.py | 97 +++++++++++++++--------------- scripts/generate_dag.py | 8 +++ tests/scripts/test_deploy_dag.py | 18 ++++-- tests/scripts/test_generate_dag.py | 13 ++++ 5 files changed, 108 insertions(+), 59 deletions(-) diff --git a/README.md b/README.md index adc80b669..1ead1b08b 100644 --- a/README.md +++ b/README.md @@ -145,17 +145,37 @@ datasets Docker images will be built and pushed to GCR by default whenever the command above is run. To skip building and pushing images, use the optional `--skip-builds` flag. -## 5. Declare and set your pipeline variables +## 5. Declare and set your Airflow variables -Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any. +Running the command in the previous step will parse your pipeline config and inform you about the Airflow variables that your pipeline expects to use and must be defined. -If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file +If your pipeline doesn't use any Airflow variables, you can skip this step. + +There are two types of variables that pipelines can use: **shared** and **dataset-specific**. Shared variables are those that can be reused by other pipelines in the same Airflow or Cloud Composer environment. These are variables that stay constant from pipeline to pipeline. Examples of shared variables include your Cloud Composer environment name and bucket, your GCP project ID, and paths to the Airflow DAG and data folders. To prevent duplication, specify your shared variables in one place: + +``` + [.dev|.test]/datasets/shared_variables.json +``` + +and inside the file, nest the variables under a common parent key. For example: + +``` +{ + "shared": { + "composer_name": "test-pipelines-abcde1234", + "composer_bucket": "us-east4-test-pipelines-abcde1234-bucket", + "airflow_data_folder": "/home/airflow/gcs/data" + } +} +``` + +For dataset-specific variables, create the following file ``` [.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json ``` -Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config: +In general, pipelines use the JSON dot notation to access Airflow variables. Make sure to define and nest your variables under some parent key when writing to the JSON files above. We recommend using your dataset's name as the parent key, to mimic the same structure as the folder hierarchy. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config: - `{{ var.json.shared.composer_bucket }}` - `{{ var.json.parent.nested }}` @@ -165,9 +185,6 @@ then your variables JSON file should look like this ```json { - "shared": { - "composer_bucket": "us-east4-test-pipelines-abcde1234-bucket" - }, "parent": { "nested": "some value", "another_nested": "another value" diff --git a/scripts/deploy_dag.py b/scripts/deploy_dag.py index 86fe47c46..94e5be055 100644 --- a/scripts/deploy_dag.py +++ b/scripts/deploy_dag.py @@ -80,40 +80,42 @@ def copy_variables_to_airflow_data_folder( composer_bucket: str = None, ): """ - cd .{ENV}/{DATASET_ID} + cd .{ENV}/datasets or .{ENV}/datasets/{dataset_id} """ - cwd = env_path / "datasets" / dataset_id - filename = f"{dataset_id}_variables.json" - - if not (cwd / filename).exists(): - warnings.warn(f"Airflow variables file {filename} does not exist.") - return - - if local: - """ - cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{DATASET_ID}_variables.json - """ - target_path = airflow_home / "data" / "variables" / filename - target_path.mkdir(parents=True, exist_ok=True) - print( - "\nCopying variables JSON file into Airflow data folder\n\n" - f" Source:\n {cwd / filename}\n\n" - f" Destination:\n {target_path}\n" - ) + for cwd, filename in ( + (env_path / "datasets", "shared_variables.json"), + (env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"), + ): + + if not (cwd / filename).exists(): + warnings.warn(f"Airflow variables file {filename} does not exist.") + continue + + if local: + """ + cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{filename} + """ + target_path = airflow_home / "data" / "variables" / filename + target_path.mkdir(parents=True, exist_ok=True) + print( + "\nCopying variables JSON file into Airflow data folder\n\n" + f" Source:\n {cwd / filename}\n\n" + f" Destination:\n {target_path}\n" + ) - subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd) - else: - """ - [remote] - gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{DATASET_ID}_variables.json... - """ - gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" - print( - "\nCopying variables JSON file into Cloud Composer data folder\n\n" - f" Source:\n {cwd / filename}\n\n" - f" Destination:\n {gcs_uri}\n" - ) - run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd) + subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd) + else: + """ + [remote] + gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{filename}... + """ + gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" + print( + "\nCopying variables JSON file into Cloud Composer data folder\n\n" + f" Source:\n {cwd / filename}\n\n" + f" Destination:\n {gcs_uri}\n" + ) + run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd) def run_cloud_composer_vars_import( @@ -155,21 +157,22 @@ def import_variables_to_airflow_env( [remote] gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- --import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json """ - cwd = env_path / "datasets" / dataset_id - filename = f"{dataset_id}_variables.json" - - if local: - print(f"\nImporting Airflow variables from {cwd / filename}...\n") - subprocess.check_call( - ["airflow", "variables", "import", str(cwd / filename)], cwd=cwd - ) - else: - gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" - airflow_path = f"/home/airflow/gcs/data/variables/{filename}" - print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n") - run_cloud_composer_vars_import( - composer_env, composer_region, airflow_path, cwd=cwd - ) + for cwd, filename in ( + (env_path / "datasets", "shared_variables.json"), + (env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"), + ): + if local: + print(f"\nImporting Airflow variables from {cwd / filename}...\n") + subprocess.check_call( + ["airflow", "variables", "import", str(cwd / filename)], cwd=cwd + ) + else: + gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" + airflow_path = f"/home/airflow/gcs/data/variables/{filename}" + print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n") + run_cloud_composer_vars_import( + composer_env, composer_region, airflow_path, cwd=cwd + ) def copy_generated_dag_to_airflow_dags_folder( diff --git a/scripts/generate_dag.py b/scripts/generate_dag.py index 731a0378a..0aeb40039 100644 --- a/scripts/generate_dag.py +++ b/scripts/generate_dag.py @@ -70,6 +70,8 @@ def main( else: generate_pipeline_dag(dataset_id, pipeline_id, env) + generate_shared_variables_file(env) + def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str): pipeline_dir = DATASETS_PATH / dataset_id / pipeline_id @@ -144,6 +146,12 @@ def generate_task_contents(task: dict) -> str: ) +def generate_shared_variables_file(env: str) -> None: + pathlib.Path( + PROJECT_ROOT / f".{env}" / "datasets" / "shared_variables.json" + ).touch() + + def dag_init(config: dict) -> dict: return config["dag"].get("initialize") or config["dag"].get("init") diff --git a/tests/scripts/test_deploy_dag.py b/tests/scripts/test_deploy_dag.py index c5887de58..25d189259 100644 --- a/tests/scripts/test_deploy_dag.py +++ b/tests/scripts/test_deploy_dag.py @@ -175,7 +175,7 @@ def test_script_always_requires_dataset_arg( pipeline_path_2 = pipeline_path -def test_script_can_deploy_without_variables_file( +def test_script_can_deploy_without_variables_files( dataset_path: pathlib.Path, pipeline_path: pathlib.Path, airflow_home: pathlib.Path, @@ -190,10 +190,17 @@ def test_script_can_deploy_without_variables_file( f"{dataset_path.name}_variables.json", ) - # Delete the variables file - vars_file = f"{dataset_path.name}_variables.json" - (ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink() - assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists() + # Delete the shared variables file + (ENV_DATASETS_PATH / "shared_variables.json").unlink() + assert not (ENV_DATASETS_PATH / "shared_variables.json").exists() + + # Delete the dataset-specific variables file + ( + ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json" + ).unlink() + assert not ( + ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json" + ).exists() mocker.patch("scripts.deploy_dag.run_gsutil_cmd") mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import") @@ -382,6 +389,7 @@ def test_script_with_local_flag_copies_files_to_local_airflow_env( composer_region=None, ) + assert (airflow_home / "data" / "variables" / "shared_variables.json").exists() assert (airflow_home / "data" / "variables" / variables_filename).exists() assert (airflow_home / "dags" / dag_filename).exists() diff --git a/tests/scripts/test_generate_dag.py b/tests/scripts/test_generate_dag.py index c1eea7d98..f0f0b3b0f 100644 --- a/tests/scripts/test_generate_dag.py +++ b/tests/scripts/test_generate_dag.py @@ -127,6 +127,19 @@ def test_main_copies_custom_dir_if_it_exists( assert (path_prefix / "custom").is_dir() +def test_main_creates_shared_variables_file( + dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str +): + copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path) + custom_path = dataset_path / pipeline_path.name / "custom" + custom_path.mkdir(parents=True, exist_ok=True) + + generate_dag.main(dataset_path.name, pipeline_path.name, env) + + assert (ENV_DATASETS_PATH / "shared_variables.json").exists() + assert not (ENV_DATASETS_PATH / "shared_variables.json").is_dir() + + def test_main_only_depends_on_pipeline_yaml( dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str ):