diff --git a/README.md b/README.md index 9d685fd8f..adc80b669 100644 --- a/README.md +++ b/README.md @@ -147,15 +147,15 @@ Docker images will be built and pushed to GCR by default whenever the command ab ## 5. Declare and set your pipeline variables -Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run. +Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any. -All variables used by a dataset must have their values set in +If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file ``` [.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json ``` -Airflow variables use JSON dot notation to access the variable's value. For example, if you're using the following variables in your pipeline config: +Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config: - `{{ var.json.shared.composer_bucket }}` - `{{ var.json.parent.nested }}` diff --git a/scripts/deploy_dag.py b/scripts/deploy_dag.py index a8bef11e3..86fe47c46 100644 --- a/scripts/deploy_dag.py +++ b/scripts/deploy_dag.py @@ -17,6 +17,7 @@ import pathlib import subprocess import typing +import warnings CURRENT_PATH = pathlib.Path(__file__).resolve().parent PROJECT_ROOT = CURRENT_PATH.parent @@ -67,6 +68,10 @@ def main( ) +def run_gsutil_cmd(args: typing.List[str], cwd: pathlib.Path): + subprocess.check_call(["gsutil"] + args, cwd=cwd) + + def copy_variables_to_airflow_data_folder( local: bool, env_path: pathlib.Path, @@ -80,7 +85,9 @@ def copy_variables_to_airflow_data_folder( cwd = env_path / "datasets" / dataset_id filename = f"{dataset_id}_variables.json" - check_existence_of_variables_file(cwd / filename) + if not (cwd / filename).exists(): + warnings.warn(f"Airflow variables file {filename} does not exist.") + return if local: """ @@ -106,7 +113,31 @@ def copy_variables_to_airflow_data_folder( f" Source:\n {cwd / filename}\n\n" f" Destination:\n {gcs_uri}\n" ) - subprocess.check_call(["gsutil", "cp", filename, gcs_uri], cwd=cwd) + run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd) + + +def run_cloud_composer_vars_import( + composer_env: str, + composer_region: str, + airflow_path: pathlib.Path, + cwd: pathlib.Path, +): + subprocess.check_call( + [ + "gcloud", + "composer", + "environments", + "run", + str(composer_env), + "--location", + str(composer_region), + "variables", + "--", + "--import", + str(airflow_path), + ], + cwd=cwd, + ) def import_variables_to_airflow_env( @@ -136,21 +167,8 @@ def import_variables_to_airflow_env( gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}" airflow_path = f"/home/airflow/gcs/data/variables/{filename}" print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n") - subprocess.check_call( - [ - "gcloud", - "composer", - "environments", - "run", - str(composer_env), - "--location", - str(composer_region), - "variables", - "--", - "--import", - str(airflow_path), - ], - cwd=cwd, + run_cloud_composer_vars_import( + composer_env, composer_region, airflow_path, cwd=cwd ) @@ -189,7 +207,7 @@ def copy_generated_dag_to_airflow_dags_folder( f" Source:\n {cwd / filename}\n\n" f" Destination:\n {target}\n" ) - subprocess.check_call(["gsutil", "cp", filename, target], cwd=cwd) + run_gsutil_cmd(["cp", filename, target], cwd=cwd) def copy_custom_callables_to_airflow_dags_folder( @@ -231,7 +249,7 @@ def copy_custom_callables_to_airflow_dags_folder( f" Source:\n {cwd / 'custom'}\n\n" f" Destination:\n {target}\n" ) - subprocess.check_call(["gsutil", "cp", "-r", "custom", target], cwd=cwd) + run_gsutil_cmd(["cp", "-r", "custom", target], cwd=cwd) def check_existence_of_variables_file(file_path: pathlib.Path): diff --git a/tests/scripts/test_deploy_dag.py b/tests/scripts/test_deploy_dag.py index bd702b101..c5887de58 100644 --- a/tests/scripts/test_deploy_dag.py +++ b/tests/scripts/test_deploy_dag.py @@ -175,6 +175,41 @@ def test_script_always_requires_dataset_arg( pipeline_path_2 = pipeline_path +def test_script_can_deploy_without_variables_file( + dataset_path: pathlib.Path, + pipeline_path: pathlib.Path, + airflow_home: pathlib.Path, + env: str, + mocker, +): + setup_dag_and_variables( + dataset_path, + pipeline_path, + airflow_home, + env, + f"{dataset_path.name}_variables.json", + ) + + # Delete the variables file + vars_file = f"{dataset_path.name}_variables.json" + (ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink() + assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists() + + mocker.patch("scripts.deploy_dag.run_gsutil_cmd") + mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import") + + deploy_dag.main( + local=False, + env_path=ENV_PATH, + dataset_id=dataset_path.name, + pipeline=pipeline_path.name, + airflow_home=airflow_home, + composer_env="test-env", + composer_bucket="test-bucket", + composer_region="test-region", + ) + + def test_script_with_pipeline_arg_deploys_only_that_pipeline( dataset_path: pathlib.Path, pipeline_path: pathlib.Path,