Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Use a single file for shared Airflow variables #122

Merged
merged 5 commits into from Jul 22, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
31 changes: 24 additions & 7 deletions README.md
Expand Up @@ -145,17 +145,37 @@ datasets

Docker images will be built and pushed to GCR by default whenever the command above is run. To skip building and pushing images, use the optional `--skip-builds` flag.

## 5. Declare and set your pipeline variables
## 5. Declare and set your Airflow variables

Running the command in the previous step will parse your pipeline config and inform you about the templated variables that need to be set for your pipeline to run, if any.
Running the command in the previous step will parse your pipeline config and inform you about the Airflow variables that your pipeline expects to use and must be defined.

If your pipeline doesn't use any Airflow variables, you can skip this step. Otherwise, create the following file
If your pipeline doesn't use any Airflow variables, you can skip this step.

There are two types of variables that pipelines can use: **shared** and **dataset-specific**. Shared variables are those that can be reused by other pipelines in the same Airflow or Cloud Composer environment. These are variables that stay constant from pipeline to pipeline. Examples of shared variables include your Cloud Composer environment name and bucket, your GCP project ID, and paths to the Airflow DAG and data folders. To prevent duplication, specify your shared variables in one place:

```
[.dev|.test]/datasets/shared_variables.json
```

and inside the file, nest the variables under a common parent key. For example:

```
{
"shared": {
"composer_name": "test-pipelines-abcde1234",
"composer_bucket": "us-east4-test-pipelines-abcde1234-bucket",
"airflow_data_folder": "/home/airflow/gcs/data"
}
}
```

For dataset-specific variables, create the following file

```
[.dev|.test]/datasets/{DATASET}/{DATASET}_variables.json
```

Pipelines use the JSON dot notation to access Airflow variables. Make sure to nest your variables in the JSON file under some namespace, typically your dataset's name. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:
In general, pipelines use the JSON dot notation to access Airflow variables. Make sure to define and nest your variables under some parent key when writing to the JSON files above. We recommend using your dataset's name as the parent key, to mimic the same structure as the folder hierarchy. Airflow variables are globally accessed by any pipeline, which means nesting your variables helps avoid collisions. For example, if you're using the following variables in your pipeline config:

- `{{ var.json.shared.composer_bucket }}`
- `{{ var.json.parent.nested }}`
Expand All @@ -165,9 +185,6 @@ then your variables JSON file should look like this

```json
{
"shared": {
"composer_bucket": "us-east4-test-pipelines-abcde1234-bucket"
},
"parent": {
"nested": "some value",
"another_nested": "another value"
Expand Down
97 changes: 50 additions & 47 deletions scripts/deploy_dag.py
Expand Up @@ -80,40 +80,42 @@ def copy_variables_to_airflow_data_folder(
composer_bucket: str = None,
):
"""
cd .{ENV}/{DATASET_ID}
cd .{ENV}/datasets or .{ENV}/datasets/{dataset_id}
"""
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
return

if local:
"""
cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{DATASET_ID}_variables.json
"""
target_path = airflow_home / "data" / "variables" / filename
target_path.mkdir(parents=True, exist_ok=True)
print(
"\nCopying variables JSON file into Airflow data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target_path}\n"
)
for cwd, filename in (
(env_path / "datasets", "shared_variables.json"),
(env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"),
):

if not (cwd / filename).exists():
warnings.warn(f"Airflow variables file {filename} does not exist.")
continue

if local:
"""
cp {DATASET_ID}_variables.json {AIRFLOW_HOME}/data/variables/{filename}
"""
target_path = airflow_home / "data" / "variables" / filename
target_path.mkdir(parents=True, exist_ok=True)
print(
"\nCopying variables JSON file into Airflow data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {target_path}\n"
)

subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd)
else:
"""
[remote]
gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{DATASET_ID}_variables.json...
"""
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
print(
"\nCopying variables JSON file into Cloud Composer data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)
subprocess.check_call(["cp", "-rf", filename, str(target_path)], cwd=cwd)
else:
"""
[remote]
gsutil cp {DATASET_ID}_variables.json gs://{COMPOSER_BUCKET}/data/variables/{filename}...
"""
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
print(
"\nCopying variables JSON file into Cloud Composer data folder\n\n"
f" Source:\n {cwd / filename}\n\n"
f" Destination:\n {gcs_uri}\n"
)
run_gsutil_cmd(["cp", filename, gcs_uri], cwd=cwd)


def run_cloud_composer_vars_import(
Expand Down Expand Up @@ -155,21 +157,22 @@ def import_variables_to_airflow_env(
[remote]
gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- --import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json
"""
cwd = env_path / "datasets" / dataset_id
filename = f"{dataset_id}_variables.json"

if local:
print(f"\nImporting Airflow variables from {cwd / filename}...\n")
subprocess.check_call(
["airflow", "variables", "import", str(cwd / filename)], cwd=cwd
)
else:
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)
for cwd, filename in (
(env_path / "datasets", "shared_variables.json"),
(env_path / "datasets" / dataset_id, f"{dataset_id}_variables.json"),
):
if local:
print(f"\nImporting Airflow variables from {cwd / filename}...\n")
subprocess.check_call(
["airflow", "variables", "import", str(cwd / filename)], cwd=cwd
)
else:
gcs_uri = f"gs://{composer_bucket}/data/variables/{filename}"
airflow_path = f"/home/airflow/gcs/data/variables/{filename}"
print(f"\nImporting Airflow variables from {gcs_uri} ({airflow_path})...\n")
run_cloud_composer_vars_import(
composer_env, composer_region, airflow_path, cwd=cwd
)


def copy_generated_dag_to_airflow_dags_folder(
Expand Down
8 changes: 8 additions & 0 deletions scripts/generate_dag.py
Expand Up @@ -70,6 +70,8 @@ def main(
else:
generate_pipeline_dag(dataset_id, pipeline_id, env)

generate_shared_variables_file(env)


def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str):
pipeline_dir = DATASETS_PATH / dataset_id / pipeline_id
Expand Down Expand Up @@ -144,6 +146,12 @@ def generate_task_contents(task: dict) -> str:
)


def generate_shared_variables_file(env: str) -> None:
pathlib.Path(
PROJECT_ROOT / f".{env}" / "datasets" / "shared_variables.json"
).touch()


def dag_init(config: dict) -> dict:
return config["dag"].get("initialize") or config["dag"].get("init")

Expand Down
18 changes: 13 additions & 5 deletions tests/scripts/test_deploy_dag.py
Expand Up @@ -175,7 +175,7 @@ def test_script_always_requires_dataset_arg(
pipeline_path_2 = pipeline_path


def test_script_can_deploy_without_variables_file(
def test_script_can_deploy_without_variables_files(
dataset_path: pathlib.Path,
pipeline_path: pathlib.Path,
airflow_home: pathlib.Path,
Expand All @@ -190,10 +190,17 @@ def test_script_can_deploy_without_variables_file(
f"{dataset_path.name}_variables.json",
)

# Delete the variables file
vars_file = f"{dataset_path.name}_variables.json"
(ENV_DATASETS_PATH / dataset_path.name / vars_file).unlink()
assert not (ENV_DATASETS_PATH / dataset_path.name / vars_file).exists()
# Delete the shared variables file
(ENV_DATASETS_PATH / "shared_variables.json").unlink()
assert not (ENV_DATASETS_PATH / "shared_variables.json").exists()

# Delete the dataset-specific variables file
(
ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json"
).unlink()
assert not (
ENV_DATASETS_PATH / dataset_path.name / f"{dataset_path.name}_variables.json"
).exists()

mocker.patch("scripts.deploy_dag.run_gsutil_cmd")
mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import")
Expand Down Expand Up @@ -382,6 +389,7 @@ def test_script_with_local_flag_copies_files_to_local_airflow_env(
composer_region=None,
)

assert (airflow_home / "data" / "variables" / "shared_variables.json").exists()
assert (airflow_home / "data" / "variables" / variables_filename).exists()
assert (airflow_home / "dags" / dag_filename).exists()

Expand Down
13 changes: 13 additions & 0 deletions tests/scripts/test_generate_dag.py
Expand Up @@ -127,6 +127,19 @@ def test_main_copies_custom_dir_if_it_exists(
assert (path_prefix / "custom").is_dir()


def test_main_creates_shared_variables_file(
dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str
):
copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path)
custom_path = dataset_path / pipeline_path.name / "custom"
custom_path.mkdir(parents=True, exist_ok=True)

generate_dag.main(dataset_path.name, pipeline_path.name, env)

assert (ENV_DATASETS_PATH / "shared_variables.json").exists()
assert not (ENV_DATASETS_PATH / "shared_variables.json").is_dir()


def test_main_only_depends_on_pipeline_yaml(
dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str
):
Expand Down