Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Support BigQueryToBigQueryOperator #86

Merged
merged 2 commits into from Jun 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 11 additions & 1 deletion samples/pipeline.yaml
Expand Up @@ -278,6 +278,16 @@ dag:
limit_memory: "250M"
limit_cpu: "1"

- operator: "BigQueryToBigQueryOperator"
description: "Task to run a BQ to BQ operator"

args:
task_id: "sample_bq_to_bq_task"
source_project_dataset_tables: ["{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.source_project_dataset_table }}"]
destination_project_dataset_table: "{{ var.json.DATASET_FOLDER_NAME.PIPELINE_NAME.destination_project_dataset_table }}"
impersonation_chain: "{{ var.json.DATASET_FOLDER_NAME.service_account }}"
write_disposition: "WRITE_TRUNCATE"

graph_paths:
# This is where you specify the relationships (i.e. directed paths/edges)
# among the tasks specified above. Use the bitshift operator to define the
Expand All @@ -286,5 +296,5 @@ dag:
# For more info, see
# https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html#setting-up-dependencies
- "sample_bash_task >> [sample_gcs_to_bq_task, sample_gcs_to_gcs_task]"
- "sample_gcs_to_bq_task >> sample_bq_sql_task"
- "sample_gcs_to_bq_task >> [sample_bq_sql_task, sample_bq_to_bq_task]"
- "sample_bq_sql_task >> sample_gcs_delete_task"
4 changes: 4 additions & 0 deletions scripts/dag_imports.json
Expand Up @@ -16,6 +16,10 @@
"import": "from airflow.contrib.operators import bigquery_operator",
"class": "bigquery_operator.BigQueryOperator"
},
"BigQueryToBigQueryOperator": {
"import": "from airflow.contrib.operators import bigquery_to_bigquery",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This note to our future selves is that importing from airflow.providers is planned in the roadmap and that's why I'm okay with this PR importing from contrib right now

"class": "bigquery_to_bigquery.BigQueryToBigQueryOperator"
},
"KubernetesPodOperator": {
"import": "from airflow.contrib.operators import kubernetes_pod_operator",
"class": "kubernetes_pod_operator.KubernetesPodOperator"
Expand Down
1 change: 1 addition & 0 deletions scripts/generate_dag.py
Expand Up @@ -33,6 +33,7 @@
"GoogleCloudStorageToGoogleCloudStorageOperator",
"GoogleCloudStorageDeleteOperator",
"BigQueryOperator",
"BigQueryToBigQueryOperator",
"KubernetesPodOperator",
}

Expand Down