diff --git a/.github/workflows/unit-tests-airflow1.yaml b/.github/workflows/unit-tests-airflow1.yaml new file mode 100644 index 000000000..8db3f5220 --- /dev/null +++ b/.github/workflows/unit-tests-airflow1.yaml @@ -0,0 +1,41 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name: Run unit tests for Airflow 1.10 operators +on: [pull_request] +jobs: + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.8] + steps: + - uses: actions/checkout@v2 + - uses: hashicorp/setup-terraform@v1 + with: + terraform_version: 0.15.1 + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install pipenv + run: pip install pipenv + - name: Install dependencies + run: pipenv install --ignore-pipfile --dev + - name: Initialize Airflow + run: pipenv run airflow db init + - name: Setup Airflow 1.10 pipeline YAML config + run: cp samples/pipeline.airflow1.yaml samples/pipeline.yaml + - name: Run tests + run: pipenv run python -m pytest -v diff --git a/.github/workflows/unit-tests.yaml b/.github/workflows/unit-tests.yaml index 9e5c2492d..40fd58f4a 100644 --- a/.github/workflows/unit-tests.yaml +++ b/.github/workflows/unit-tests.yaml @@ -1,3 +1,17 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + name: Run unit tests on: [pull_request] jobs: diff --git a/README.md b/README.md index 88272b336..73fa3d74d 100644 --- a/README.md +++ b/README.md @@ -58,7 +58,9 @@ Use only underscores and alpha-numeric characters for the names. If you created a new dataset directory above, you need to create a `datasets/DATASET/dataset.yaml` config file. See this [section](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/README.md#yaml-config-reference) for the `dataset.yaml` reference. -Create a `datasets/DATASET/PIPELINE/pipeline.yaml` config file for your pipeline. See [here](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.yaml) for the `pipeline.yaml` reference. +Create a `datasets/DATASET/PIPELINE/pipeline.yaml` config file for your pipeline. See [here](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/) for the `pipeline.yaml` references. + +For a YAML config template using Airflow 1.10 operators, see [`samples/pipeline.airflow1.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.airflow1.yaml). If you'd like to get started faster, you can inspect config files that already exist in the repository and infer the patterns from there: @@ -219,10 +221,14 @@ $ pipenv run python -m pytest -v # YAML Config Reference -Every dataset and pipeline folder must contain a `dataset.yaml` and a `pipeline.yaml` configuration file, respectively: +Every dataset and pipeline folder must contain a `dataset.yaml` and a `pipeline.yaml` configuration file, respectively. + +The `samples` folder contains references for the YAML config files, complete with descriptions for config blocks and Airflow operators and parameters. When creating a new dataset or pipeline, you can copy them to your specific dataset/pipeline paths to be used as templates. -- For dataset configuration syntax, see [`samples/dataset.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/dataset.yaml) as a reference. -- For pipeline configuration syntax, see [`samples/pipeline.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.yaml) as a reference. +- For dataset configuration syntax, see the [`samples/dataset.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/dataset.yaml) reference. +- For pipeline configuration syntax: + - For the default Airflow 2 operators, see the [`samples/pipeline.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.yaml) reference. + - If you'd like to use Airflow 1.10 operators, see the [`samples/pipeline.airflow1.yaml`](https://github.com/GoogleCloudPlatform/public-datasets-pipelines/blob/main/samples/pipeline.yaml) as a reference. # Best Practices diff --git a/samples/pipeline.airflow1.yaml b/samples/pipeline.airflow1.yaml new file mode 100644 index 000000000..30261d145 --- /dev/null +++ b/samples/pipeline.airflow1.yaml @@ -0,0 +1,337 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# ===== NOTE ===== +# This YAML config template is used to write DAGs that use Airflow 1.10 operators. +# You can keep using this template when deploying to an environment that uses Airflow 2, +# they will keep working due to backport compatibility. +# +# For tracking progress on the YAML config templates that use Airflow 2 operators, see +# https://github.com/GoogleCloudPlatform/public-datasets-pipelines/issues/137. + +--- +resources: + # A list of GCP resources that are unique and specific to your pipeline. + # + # The currently supported resources are shown below. Use only the resources + # needed by your pipeline, and delete the rest of the examples. + # + # We will keep adding to the list below to support more Google Cloud resources + # over time. If a resource you need isn't supported, please file an issue on + # the repository. + + - type: bigquery_table + # A Google BigQuery table to store your data. Requires a `bigquery_dataset` + # to be specified in the config (i.e. `dataset.yaml) for the dataset that + # this pipeline belongs in. + # + # Required Properties: + # table_id + table_id: PIPELINE_FOLDER_NAME + + # Optional Properties: + # Description of the table + description: "This is a table description." + + # Time-based partitioning configuration. There is no need for this property + # if you have a relatively small dataset to host on a BigQuery table. + time_partitioning: + + # The supported types are DAY, HOUR, MONTH, and YEAR, which will generate one partition per day, hour, month, and year, respectively. + type: "DAY" + + # If set to true, queries over this table require a partition filter that can be used for partition elimination to be specified. + require_partition_filter: false + + # Specifies column names to use for data clustering. Up to four top-level columns are allowed, and should be specified in descending priority order. + clustering: + - "column_1" + - "column_2" + - "column_3" + + # The table cannot be deleted without first disabling this property. + # Unless this field is set to false in Terraform state, a `terraform destroy` + # or `terraform apply` that would delete the table will fail. + deletion_protection: true + +dag: + # [Required] Specify the Airflow version of the operators used by the DAG. + airflow_version: 1 + + # The DAG acronym stands for directed acyclic graph. This block represents + # your data pipeline along with every property and configuration it needs to + # onboard your data. + initialize: + dag_id: PIPELINE_FOLDER_NAME + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@once" # run once a week at Sunday 12am + catchup: False + default_view: graph + + tasks: + # This is where you specify the tasks (a.k.a. processes) that your data + # pipeline will run to onboard the data. + # + # As the examples below will show, every task must be represented by an + # Airflow operator. The list of suported operators are listed in + # + # scripts/dag_imports.json + # + # If an operator you need isn't supported, please file an issue on the + # repository. + # + # Use the YAML list syntax in this block to specify every task for your + # pipeline. + + - operator: "BashOperator" + # Initializes an Airflow BashOperator for the DAG. This operator can be + # used to + # - Download from HTTP sources + # - Run custom Python scripts + # - Run processes using specific packages that support CLI commands + + # Task description + description: "Run a custom Python script" + + args: + # Arguments supported by this operator: + # https://airflow.apache.org/docs/apache-airflow/1.10.15/howto/operator/bash.html + + task_id: "sample_bash_task" + bash_command: | + mkdir -p $airflow_home/data/$dataset/$pipeline/run_date={{ ds }} + CUSTOM_ENV_VAR=$custom_env_var python $airflow_home/dags/$dataset/$pipeline/custom/some_script.py + env: + airflow_home: "{{ var.json.shared.airflow_home }}" + dataset: DATASET_FOLDER_NAME + pipeline: PIPELINE_FOLDER_NAME + custom_env_var: "some value that your custom script needs" + + - operator: "GoogleCloudStorageToBigQueryOperator" + # Initializes GCS to BQ task for the DAG. This operator is used to load a + # CSV file from GCS into a BigQuery table. + + # Task description + description: "Task to load CSV data to a BigQuery table" + + args: + # Arguments supported by this operator: + # http://airflow.apache.org/docs/apache-airflow/1.10.15/howto/operator/gcp/gcs.html#googlecloudstoragetobigqueryoperator + + task_id: "sample_gcs_to_bq_task" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/DATASET_FOLDER_NAME/PIPELINE_FOLDER_NAME/run_date={{ ds }}/data.csv"] + source_format: "CSV" + destination_project_dataset_table: "DATASET_FOLDER_NAME.PIPELINE_FOLDER_NAME" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + schema_fields: + - name: "name" + type: "STRING" + mode: "REQUIRED" + - name: "string_col" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "REQUIRED" + - name: "num_col" + type: "INTEGER" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToGoogleCloudStorageOperator" + # Initializes a GCS-to-GCS task for the DAG. This operator is used to copy + # GCS objects from one location to another. + + # Task description + description: "Task to run a GoogleCloudStorageToGoogleCloudStorageOperator" + + args: + # Arguments supported by this operator: + # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/gcs_to_gcs/index.html#airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator + + task_id: "sample_gcs_to_gcs_task" + + # The GCS bucket to copy the object/s from + source_bucket: "{{ var.json.shared.composer_bucket }}" + + # Use a trailing "/*" if you want to copy all objects under that path. + source_object: "data/DATASET_FOLDER_NAME/PIPELINE_FOLDER_NAME/run_date={{ ds }}/*" + + # The GCS bucket to copy the object/s to + destination_bucket: "{{ var.json.DATASET_FOLDER_NAME.destination_bucket }}" + + # The GCS prefix to copy the object/s to + destination_object: "datasets/DATASET_FOLDER_NAME/PIPELINE_FOLDER_NAME/run_date={{ ds }}/" + + # Use this argument if you don't want to keep the source object/s. + move_object: True + + - operator: "BigQueryOperator" + # Initializes a BigQuery operator that executes SQL queries in a specific + # BigQuery table. + + # Task description + description: "Task to run a BigQueryOperator" + + args: + # Arguments supported by this operator: + # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator + + task_id: "sample_bq_sql_task" + + # The SQL query to execute, along with query parameters. For more info, + # see https://cloud.google.com/bigquery/docs/parameterized-queries. + sql: "SELECT * FROM DATASET_FOLDER_NAME.PIPELINE_FOLDER_NAME LIMIT @max_rows" + query_params: + - name: "max_rows" + parameterType: + type: "INTEGER" + parameterValue: + value: 100 + + # The BigQuery destination table + destination_dataset_table: "destination_dataset.destination_table" + + # How to write to the destination: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + - operator: "GoogleCloudStorageDeleteOperator" + # Initializes a GCS operator that deletes all specified objects + + # Task description + description: "Task to run a GoogleCloudStorageDeleteOperator" + + args: + # Arguments supported by this operator: + # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/gcs_delete_operator/index.html#airflow.contrib.operators.gcs_delete_operator.GoogleCloudStorageDeleteOperator + task_id: "sample_gcs_delete_task" + + # The GCS bucket where the objects to delete are located. + bucket_name: "sample_bucket" + + # List of objects to delete. These should be the names of objects in the bucket, not including gs://bucket/. + objects: ["path/to/some_object"] + + # Alternatively, you can specify a prefix of objects to delete. + # All objects matching this prefix in the bucket will be deleted. + prefix: "prefix/to/delete" + + - operator: "KubernetesPodOperator" + # Executes a task in a Kubernetes pod that uses the Cloud Composer environment's own CPU and memory resources. + # + # Note: Do NOT use this for very heavy workloads. This can potentially starve resources from Cloud Composer + # and affect data pipeline orchestration overall. Instead, use a different GKE cluster using Kubernetes + # Engine operators: https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/kubernetes_engine.py + + # Task description + description: "Task to run a KubernetesPodOperator" + + args: + # Arguments supported by this operator: + # https://airflow.readthedocs.io/en/1.10.15/_api/airflow/contrib/operators/kubernetes_pod_operator/index.html#airflow.contrib.operators.kubernetes_pod_operator.KubernetesPodOperator + + task_id: "sample_kube_pod_operator" + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "sample-kube-operator" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + # The Google Container Registry image URL. To prepare a Docker image to be used by this operator: + # + # 1. Create an `_images` folder under your dataset folder if it doesn't exist. + # + # 2. Inside the `_images` folder, create another folder and name it after what the image is expected to do, e.g. process_shapefiles, get_cdf_metadata. + # + # 3. In that subfolder, create a Dockerfile (https://docs.docker.com/engine/reference/builder/) and any scripts you need to process the data. Use the `COPY` command (https://docs.docker.com/engine/reference/builder/#copy) in your `Dockerfile` to include your scripts in the image. + # + # The resulting file tree for a dataset that uses two container images may look like + # + # datasets + # └── DATASET + # ├── _images + # │ ├── container_a + # │ │ ├── Dockerfile + # │ │ ├── requirements.txt + # │ │ └── script.py + # │ └── container_b + # │ ├── Dockerfile + # │ ├── requirements.txt + # │ └── script.py + # ├── _terraform/ + # ├── PIPELINE_A + # ├── PIPELINE_B + # ├── ... + # └── dataset.yaml + # + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.DATASET_FOLDER_NAME.container_registry.IMAGE_REPOSITORY }}" + + # Always pull the latest image. We recommend to keep this as "Always". + image_pull_policy: "Always" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + TEST_ENV_VAR: "test-value" + ANOTHER_ENV_VAR: 12345 + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "250M" + limit_cpu: "1" + + - operator: "BigQueryToBigQueryOperator" + description: "Task to run a BQ to BQ operator" + + args: + task_id: "sample_bq_to_bq_task" + source_project_dataset_tables: ["{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.source_project_dataset_table }}"] + destination_project_dataset_table: "{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.destination_project_dataset_table }}" + impersonation_chain: "{{ var.json.DATASET_FOLDER_NAME.service_account }}" + write_disposition: "WRITE_TRUNCATE" + + graph_paths: + # This is where you specify the relationships (i.e. directed paths/edges) + # among the tasks specified above. Use the bitshift operator to define the + # relationships and the `task_id` value above to represent tasks. + # + # For more info, see + # https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies + - "sample_bash_task >> [sample_gcs_to_bq_task, sample_gcs_to_gcs_task]" + - "sample_gcs_to_bq_task >> [sample_bq_sql_task, sample_bq_to_bq_task]" + - "sample_bq_sql_task >> sample_gcs_delete_task" diff --git a/samples/pipeline.yaml b/samples/pipeline.yaml index 37b943d9a..cac42c1b6 100644 --- a/samples/pipeline.yaml +++ b/samples/pipeline.yaml @@ -13,14 +13,6 @@ # limitations under the License. -# ===== NOTE ===== -# This YAML config template is used to write DAGs that use Airflow 1.10 operators. -# You can keep using this template when deploying to an environment that uses Airflow 2, -# they will keep working due to backport compatibility. -# -# For tracking progress on the YAML config templates that use Airflow 2 operators, see -# https://github.com/GoogleCloudPlatform/public-datasets-pipelines/issues/137. - --- resources: # A list of GCP resources that are unique and specific to your pipeline. @@ -67,8 +59,8 @@ resources: deletion_protection: true dag: - # Specify the Airflow version of the operators used by the DAG. Defaults to Airflow 2 when unspecified. - airflow_version: 1 + # [Required] Specify the Airflow version of the operators used by the DAG. + airflow_version: 2 # The DAG acronym stands for directed acyclic graph. This block represents # your data pipeline along with every property and configuration it needs to @@ -82,7 +74,7 @@ dag: depends_on_past: False start_date: '2021-03-01' max_active_runs: 1 - schedule_interval: "@once" # run once a week at Sunday 12am + schedule_interval: "@once" catchup: False default_view: graph @@ -113,7 +105,7 @@ dag: args: # Arguments supported by this operator: - # https://airflow.apache.org/docs/apache-airflow/1.10.15/howto/operator/bash.html + # https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/bash.html task_id: "sample_bash_task" bash_command: | @@ -127,15 +119,14 @@ dag: - operator: "GoogleCloudStorageToBigQueryOperator" # Initializes GCS to BQ task for the DAG. This operator is used to load a - # CSV file from GCS into a BigQuery table. + # JSON, CSV, Avro, ORC, or Parquet data from GCS into a BigQuery table. # Task description description: "Task to load CSV data to a BigQuery table" + # Arguments supported by this operator: + # http://airflow.apache.org/docs/apache-airflow/stable/howto/operator/gcp/gcs.html#googlecloudstoragetobigqueryoperator args: - # Arguments supported by this operator: - # http://airflow.apache.org/docs/apache-airflow/1.10.15/howto/operator/gcp/gcs.html#googlecloudstoragetobigqueryoperator - task_id: "sample_gcs_to_bq_task" # The GCS bucket where the CSV file is located in. @@ -171,16 +162,16 @@ dag: type: "INTEGER" mode: "NULLABLE" + # Initializes a GCS-to-GCS task for the DAG. This operator is used to copy or move + # GCS objects from one location to another. - operator: "GoogleCloudStorageToGoogleCloudStorageOperator" - # Initializes a GCS-to-GCS task for the DAG. This operator is used to copy - # GCS objects from one location to another. # Task description description: "Task to run a GoogleCloudStorageToGoogleCloudStorageOperator" args: # Arguments supported by this operator: - # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/gcs_to_gcs/index.html#airflow.contrib.operators.gcs_to_gcs.GoogleCloudStorageToGoogleCloudStorageOperator + # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/transfers/gcs_to_gcs/index.html task_id: "sample_gcs_to_gcs_task" @@ -190,6 +181,11 @@ dag: # Use a trailing "/*" if you want to copy all objects under that path. source_object: "data/DATASET_FOLDER_NAME/PIPELINE_FOLDER_NAME/run_date={{ ds }}/*" + # Optionally, you can supply a list of objects to specifically copy + source_objects: + - "path/to/object" + - "path/to/another/object" + # The GCS bucket to copy the object/s to destination_bucket: "{{ var.json.DATASET_FOLDER_NAME.destination_bucket }}" @@ -199,16 +195,28 @@ dag: # Use this argument if you don't want to keep the source object/s. move_object: True - - operator: "BigQueryOperator" - # Initializes a BigQuery operator that executes SQL queries in a specific - # BigQuery table. + # Whether you want to replace existing destination files or not. + replace: False + + # [Optional] This is used to restrict the result to only the matching names in a given folder + # If source_objects = ['foo/bah/'] and delimiter = '.avro', then only the .avro + # files will be copied to the destination folder. + delimiter: ".csv" + + # [Optional] When specified, the objects will be copied or moved, only if they were modified + # after last_modified_time. If tzinfo has not been set, UTC will be assumed. + last_modified_time: "2021-08-10 11:58:27" + + - operator: "BigQueryExecuteQueryOperator" + # Initializes a BigQuery operator that executes SQL queries in a specific BigQuery table, + # and can optionally store the results of a query to another table # Task description - description: "Task to run a BigQueryOperator" + description: "Task to run a execute a BigQuery SQL query in a specific BigQuery database" args: # Arguments supported by this operator: - # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/bigquery_operator/index.html#airflow.contrib.operators.bigquery_operator.BigQueryOperator + # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/bigquery/index.html#airflow.providers.google.cloud.operators.bigquery.BigQueryExecuteQueryOperator task_id: "sample_bq_sql_task" @@ -222,23 +230,44 @@ dag: parameterValue: value: 100 - # The BigQuery destination table + # If specified, will store the results of the query to a BigQuery destination table destination_dataset_table: "destination_dataset.destination_table" + # Specifies whether the job is allowed to create new tables. + create_disposition: "CREATE_IF_NEEDED" + # How to write to the destination: overwrite, append, or write if empty # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition write_disposition: "WRITE_TRUNCATE" + # [Optional] Configure optional time partitioning fields i.e. partition by field, type and expiration + # as per API specifications. + time_partitioning: + # The supported types are DAY, HOUR, MONTH, and YEAR, which will generate one partition per day, hour, + # month, and year, respectively. + type: "DAY" + + # [Optional] BigQuery supports clustering for both partitioned and non-partitioned tables. The order + # of columns given determines the sort order. + cluster_fields: + - "column_1" + - "column_2" + - "column_3" + + # [Optional for US and EU] The geographic location of the job + location: "asia-northeast1" + + # Deletes objects from a Google Cloud Storage bucket, either from an explicit list of object names + # or all objects matching a prefix. - operator: "GoogleCloudStorageDeleteOperator" - # Initializes a GCS operator that deletes all specified objects # Task description - description: "Task to run a GoogleCloudStorageDeleteOperator" + description: "Task to delete objects from a GCS bucket" + # Arguments supported by this operator: + # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/gcs/index.html#airflow.providers.google.cloud.operators.gcs.GCSDeleteObjectsOperator args: - # Arguments supported by this operator: - # https://airflow.apache.org/docs/apache-airflow/1.10.15/_api/airflow/contrib/operators/gcs_delete_operator/index.html#airflow.contrib.operators.gcs_delete_operator.GoogleCloudStorageDeleteOperator - task_id: "sample_gcs_delete_task" + task_id: "gcs_delete_task" # The GCS bucket where the objects to delete are located. bucket_name: "sample_bucket" @@ -250,12 +279,11 @@ dag: # All objects matching this prefix in the bucket will be deleted. prefix: "prefix/to/delete" + # Executes a task in a Kubernetes pod that uses the Cloud Composer environment's own CPU and memory resources. + # Note: Do NOT use this for very heavy workloads. This can potentially starve resources from Cloud Composer + # and affect data pipeline orchestration overall. Instead, use a different GKE cluster using Kubernetes + # Engine operators: https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/kubernetes_engine.py - operator: "KubernetesPodOperator" - # Executes a task in a Kubernetes pod that uses the Cloud Composer environment's own CPU and memory resources. - # - # Note: Do NOT use this for very heavy workloads. This can potentially starve resources from Cloud Composer - # and affect data pipeline orchestration overall. Instead, use a different GKE cluster using Kubernetes - # Engine operators: https://github.com/apache/airflow/blob/master/airflow/providers/google/cloud/operators/kubernetes_engine.py # Task description description: "Task to run a KubernetesPodOperator" @@ -315,15 +343,67 @@ dag: limit_memory: "250M" limit_cpu: "1" - - operator: "BigQueryToBigQueryOperator" - description: "Task to run a BQ to BQ operator" + # Documentation: + # http://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/operators/kubernetes_engine/index.html#airflow.providers.google.cloud.operators.kubernetes_engine.GKEStartPodOperator + - operator: "GKEStartPodOperator" args: - task_id: "sample_bq_to_bq_task" - source_project_dataset_tables: ["{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.source_project_dataset_table }}"] - destination_project_dataset_table: "{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.destination_project_dataset_table }}" - impersonation_chain: "{{ var.json.DATASET_FOLDER_NAME.service_account }}" - write_disposition: "WRITE_TRUNCATE" + task_id: "gke_start_pod_task" + project_id: "{{ var.json.shared.gcp_project_id }}" + location: "{{ var.json.shared.gcp_location }}" + + # The name of the Google Kubernetes Engine cluster the pod should be spawned in + cluster_name: "GKE_CLUSTER_NAME" + + # The namespace to run within Kubernetes + namespace: "default" + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "sample-gke-pod" + + # The GCR image URL. It's recommended to redact its value and set it using Airflow variables + image: "{{ var.json.DATASET_FOLDER_NAME.container_registry.IMAGE_REPOSITORY }}" + + # [Optional] Entrypoint command for the container + cmds: + - "mkdir -p /airflow/xcom/ &&" + - "echo '[1,2,3,4]' > /airflow/xcom/return.json" + + # [Optional] Enable the usage of XCom on the operator. XCom allows you to store values used downstream + # in your DAG. To do this, create a `return.json` file under /airflow/xcom in your pod script. To use + # XCom variables in other operators, use the macro {{ task_instance.xcom_pull('POD_TASK_ID')[0] }} + do_xcom_push: True + + # Deletes a GKE cluster, including the Kubernetes endpoint and all worker nodes. + - operator: "GKEDeleteClusterOperator" + + args: + task_id: "gke_delete_cluster_task" + project_id: "{{ var.json.shared.gcp_project_id }}" + location: "{{ var.json.shared.gcp_location }}" + + # The GKE cluster name + name: "sample-gke-cluster" + + # Optional service account to impersonate using short-term credentials + impersonation_chain: "{{ var.json.DATASET_FOLDER_NAME.PIPELINE_FOLDER_NAME.service_account }}" + + # Create a GKE cluster of specified dimensions. The operator will wait until the cluster is created. + - operator: "GKECreateClusterOperator" + + args: + task_id: "gke_create_cluster_task" + project_id: "{{ var.json.shared.gcp_project_id }}" + location: "{{ var.json.shared.gcp_location }}" + + # The cluster definition to create, + # see https://googleapis.dev/python/container/latest/container_v1/types.html#google.cloud.container_v1.types.Cluster + body: + name: "sample-gke-cluster" + initial_node_count: 1 + + # Optional service account to impersonate using short-term credentials + impersonation_chain: "{{ var.json.DATASET_FOLDER_NAME.PIPELINE_FOLDER_NAME.service_account }}" graph_paths: # This is where you specify the relationships (i.e. directed paths/edges) @@ -333,5 +413,5 @@ dag: # For more info, see # https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies - "sample_bash_task >> [sample_gcs_to_bq_task, sample_gcs_to_gcs_task]" - - "sample_gcs_to_bq_task >> [sample_bq_sql_task, sample_bq_to_bq_task]" - - "sample_bq_sql_task >> sample_gcs_delete_task" + - "sample_gcs_to_bq_task >> [sample_bq_sql_task, gcs_delete_task]" + - "gke_create_cluster_task >> gke_start_pod_task >> gke_delete_cluster_task" diff --git a/scripts/dag_imports.json b/scripts/dag_imports.json index 302ced257..c0f88ee0d 100644 --- a/scripts/dag_imports.json +++ b/scripts/dag_imports.json @@ -46,6 +46,10 @@ "import": "from airflow.providers.google.cloud.operators import gcs", "class": "gcs.GCSDeleteObjectsOperator" }, + "BigQueryExecuteQueryOperator": { + "import": "from airflow.providers.google.cloud.operators import bigquery", + "class": "bigquery.BigQueryExecuteQueryOperator" + }, "BigQueryInsertJobOperator": { "import": "from airflow.providers.google.cloud.operators import bigquery", "class": "bigquery.BigQueryInsertJobOperator" @@ -53,6 +57,18 @@ "KubernetesPodOperator": { "import": "from airflow.providers.cncf.kubernetes.operators import kubernetes_pod", "class": "kubernetes_pod.KubernetesPodOperator" + }, + "GKEStartPodOperator": { + "import": "from airflow.providers.google.cloud.operators import kubernetes_engine", + "class": "kubernetes_engine.GKEStartPodOperator" + }, + "GKECreateClusterOperator": { + "import": "from airflow.providers.google.cloud.operators import kubernetes_engine", + "class": "kubernetes_engine.GKECreateClusterOperator" + }, + "GKEDeleteClusterOperator": { + "import": "from airflow.providers.google.cloud.operators import kubernetes_engine", + "class": "kubernetes_engine.GKEDeleteClusterOperator" } - } + } } diff --git a/scripts/deploy_dag.py b/scripts/deploy_dag.py index 01528fb0b..db8a523c8 100644 --- a/scripts/deploy_dag.py +++ b/scripts/deploy_dag.py @@ -27,7 +27,7 @@ CURRENT_PATH = pathlib.Path(__file__).resolve().parent PROJECT_ROOT = CURRENT_PATH.parent DATASETS_PATH = PROJECT_ROOT / "datasets" -DEFAULT_AIRFLOW_VERSION = 1 +DEFAULT_AIRFLOW_VERSION = 2 class IncompatibilityError(Exception): @@ -175,7 +175,7 @@ def import_variables_to_airflow_env( airflow variables import .{ENV}/datasets/{DATASET_ID}/variables.json [remote] - gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- --import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json + gcloud composer environments run COMPOSER_ENV --location COMPOSER_REGION variables -- import /home/airflow/gcs/data/variables/{DATASET_ID}_variables.json """ for cwd, filename in ( (env_path / "datasets", "shared_variables.json"), diff --git a/scripts/generate_dag.py b/scripts/generate_dag.py index 93a61e437..7c313dde8 100644 --- a/scripts/generate_dag.py +++ b/scripts/generate_dag.py @@ -27,16 +27,6 @@ yaml = yaml.YAML(typ="safe") -OPERATORS = { - "BashOperator", - "GoogleCloudStorageToBigQueryOperator", - "GoogleCloudStorageToGoogleCloudStorageOperator", - "GoogleCloudStorageDeleteOperator", - "BigQueryOperator", - "BigQueryToBigQueryOperator", - "KubernetesPodOperator", -} - CURRENT_PATH = pathlib.Path(__file__).resolve().parent PROJECT_ROOT = CURRENT_PATH.parent DATASETS_PATH = PROJECT_ROOT / "datasets" @@ -52,6 +42,7 @@ DEFAULT_AIRFLOW_VERSION = 2 AIRFLOW_IMPORTS = json.load(open(CURRENT_PATH / "dag_imports.json")) +AIRFLOW_VERSIONS = list(AIRFLOW_IMPORTS.keys()) def main( @@ -77,6 +68,7 @@ def generate_pipeline_dag(dataset_id: str, pipeline_id: str, env: str): pipeline_dir = DATASETS_PATH / dataset_id / pipeline_id config = yaml.load((pipeline_dir / "pipeline.yaml").read_text()) + validate_airflow_version_existence_and_value(config) validate_dag_id_existence_and_format(config) dag_contents = generate_dag(config, dataset_id) @@ -135,7 +127,7 @@ def generate_dag_context(config: dict, dataset_id: str) -> str: def generate_task_contents(task: dict, airflow_version: str) -> str: - validate_task(task) + validate_task(task, airflow_version) return jinja2.Template(TEMPLATE_PATHS["task"].read_text()).render( **task, namespaced_operator=AIRFLOW_IMPORTS[airflow_version][task["operator"]]["class"], @@ -162,6 +154,14 @@ def namespaced_dag_id(dag_id: str, dataset_id: str) -> str: return f"{dataset_id}.{dag_id}" +def validate_airflow_version_existence_and_value(config: dict): + if "airflow_version" not in config["dag"]: + raise KeyError("Missing required parameter:`dag.airflow_version`") + + if str(config["dag"]["airflow_version"]) not in AIRFLOW_VERSIONS: + raise ValueError("`dag.airflow_version` must be a valid Airflow major version") + + def validate_dag_id_existence_and_format(config: dict): init = dag_init(config) if not init.get("dag_id"): @@ -174,12 +174,14 @@ def validate_dag_id_existence_and_format(config: dict): ) -def validate_task(task: dict): +def validate_task(task: dict, airflow_version: str): if not task.get("operator"): raise KeyError(f"`operator` key must exist in {task}") - if not task["operator"] in OPERATORS: - raise ValueError(f"`task.operator` must be one of {list(OPERATORS)}") + if not task["operator"] in AIRFLOW_IMPORTS[airflow_version]: + raise ValueError( + f"`task.operator` must be one of {list(AIRFLOW_IMPORTS[airflow_version].keys())}" + ) if not task["args"].get("task_id"): raise KeyError(f"`args.task_id` key must exist in {task}") diff --git a/tests/scripts/test_deploy_dag.py b/tests/scripts/test_deploy_dag.py index 3a855541e..c33341b21 100644 --- a/tests/scripts/test_deploy_dag.py +++ b/tests/scripts/test_deploy_dag.py @@ -211,7 +211,7 @@ def test_script_can_deploy_without_variables_files( mocker.patch("scripts.deploy_dag.run_gsutil_cmd") mocker.patch("scripts.deploy_dag.run_cloud_composer_vars_import") - mocker.patch("scripts.deploy_dag.composer_airflow_version", return_value=1) + mocker.patch("scripts.deploy_dag.composer_airflow_version", return_value=2) deploy_dag.main( local=False, diff --git a/tests/scripts/test_generate_dag.py b/tests/scripts/test_generate_dag.py index 842aa69d5..9612f68bf 100644 --- a/tests/scripts/test_generate_dag.py +++ b/tests/scripts/test_generate_dag.py @@ -13,6 +13,7 @@ # limitations under the License. +import json import pathlib import random import shutil @@ -36,6 +37,9 @@ ENV_PATH = generate_dag.PROJECT_ROOT / ".test" ENV_DATASETS_PATH = ENV_PATH / "datasets" +AIRFLOW_IMPORTS = json.load(open(PROJECT_ROOT / "scripts" / "dag_imports.json")) +AIRFLOW_VERSIONS = list(AIRFLOW_IMPORTS.keys()) + @pytest.fixture def dataset_path() -> typing.Iterator[pathlib.Path]: @@ -154,47 +158,44 @@ def test_main_creates_shared_variables_file( assert not (ENV_DATASETS_PATH / "shared_variables.json").is_dir() -def test_main_uses_airflow1_operators_when_airflow_version_is_set_to_1( +def test_main_raises_an_error_when_airflow_version_is_not_specified( dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str ): copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path) - - # Verify that the template uses Airflow 1 config = yaml.load(open(pipeline_path / "pipeline.yaml")) - assert config["dag"]["airflow_version"] == 1 - generate_dag.main(dataset_path.name, pipeline_path.name, env) + # Don't specify the `airflow_version` + del config["dag"]["airflow_version"] + with open(pipeline_path / "pipeline.yaml", "w") as file: + yaml.dump(config, file) - for path_prefix in ( - pipeline_path, - ENV_DATASETS_PATH / dataset_path.name / pipeline_path.name, - ): - assert (path_prefix / f"{pipeline_path.name}_dag.py").exists() - assert ( - "airflow.contrib.operators" - in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() - ) - assert ( - "airflow.providers.google" - not in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() - ) + with pytest.raises(KeyError): + generate_dag.main(dataset_path.name, pipeline_path.name, env) -# Requires a pipeline.yaml config that uses Airflow2 operators. Soon to follow. -# See https://github.com/GoogleCloudPlatform/public-datasets-pipelines/issues/137 -@pytest.mark.skip() -def test_main_defaults_to_using_airflow2_when_airflow_version_is_unspecified( +def test_main_raises_an_error_when_airflow_version_is_incorrect( dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str ): copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path) - config = yaml.load(open(pipeline_path / "pipeline.yaml")) - # Delete the `airflow_version` field in the pipeline YAML config - del config["dag"]["airflow_version"] + # Set an incorrect `airflow_version` + config["dag"]["airflow_version"] = "789" with open(pipeline_path / "pipeline.yaml", "w") as file: yaml.dump(config, file) + with pytest.raises(ValueError): + generate_dag.main(dataset_path.name, pipeline_path.name, env) + + +def test_main_uses_airflow_operators_based_on_airflow_version_specified_in_the_config( + dataset_path: pathlib.Path, pipeline_path: pathlib.Path, env: str +): + copy_config_files_and_set_tmp_folder_names_as_ids(dataset_path, pipeline_path) + + config = yaml.load(open(pipeline_path / "pipeline.yaml")) + airflow_version = config["dag"]["airflow_version"] + generate_dag.main(dataset_path.name, pipeline_path.name, env) for path_prefix in ( @@ -202,14 +203,25 @@ def test_main_defaults_to_using_airflow2_when_airflow_version_is_unspecified( ENV_DATASETS_PATH / dataset_path.name / pipeline_path.name, ): assert (path_prefix / f"{pipeline_path.name}_dag.py").exists() - assert ( - "providers.google" - in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() - ) - assert ( - "airflow.contrib.operators" - not in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() - ) + + if airflow_version == "1": + assert ( + "airflow.contrib.operators" + in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() + ) + assert ( + "airflow.providers.google" + not in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() + ) + elif airflow_version == "2": + assert ( + "airflow.contrib.operators" + not in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() + ) + assert ( + "airflow.providers.google" + in (path_prefix / f"{pipeline_path.name}_dag.py").read_text() + ) def test_main_only_depends_on_pipeline_yaml( @@ -243,25 +255,26 @@ def test_main_errors_out_on_nonexisting_pipeline_yaml( def test_checks_for_task_operator_and_id(): - valid_task = { - "operator": "GoogleCloudStorageToBigQueryOperator", - "args": {"task_id": "load_gcs_to_bq"}, - } - generate_dag.validate_task(valid_task) - - non_existing_operator = { - "operator": "NonExisting", - "args": {"task_id": "load_gcs_to_bq"}, - } - with pytest.raises(ValueError): - generate_dag.validate_task(non_existing_operator) - - non_existing_task_id = { - "operator": "GoogleCloudStorageToBigQueryOperator", - "args": {"some_arg": "some_val"}, - } - with pytest.raises(KeyError): - generate_dag.validate_task(non_existing_task_id) + for airflow_version in AIRFLOW_VERSIONS: + valid_task = { + "operator": "GoogleCloudStorageToBigQueryOperator", + "args": {"task_id": "load_gcs_to_bq"}, + } + generate_dag.validate_task(valid_task, airflow_version) + + non_existing_operator = { + "operator": "NonExisting", + "args": {"task_id": "load_gcs_to_bq"}, + } + with pytest.raises(ValueError): + generate_dag.validate_task(non_existing_operator, airflow_version) + + non_existing_task_id = { + "operator": "GoogleCloudStorageToBigQueryOperator", + "args": {"some_arg": "some_val"}, + } + with pytest.raises(KeyError): + generate_dag.validate_task(non_existing_task_id, airflow_version) def test_generated_dag_file_loads_properly_in_python(