From c9b8b7ae30fcdacd8c62bf7ea8e85b4ea9c1a41d Mon Sep 17 00:00:00 2001 From: Adler Santos Date: Thu, 14 Oct 2021 14:31:58 -0400 Subject: [PATCH] chore: Regenerate DAG files for `new_york` dataset (#210) --- .../311_service_requests/311_service_requests_dag.py | 7 ++++--- .../new_york/citibike_stations/citibike_stations_dag.py | 7 ++++--- datasets/new_york/tree_census_1995/tree_census_1995_dag.py | 7 ++++--- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/datasets/new_york/311_service_requests/311_service_requests_dag.py b/datasets/new_york/311_service_requests/311_service_requests_dag.py index b48ed6328..0e58e9915 100644 --- a/datasets/new_york/311_service_requests/311_service_requests_dag.py +++ b/datasets/new_york/311_service_requests/311_service_requests_dag.py @@ -14,7 +14,8 @@ from airflow import DAG -from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", @@ -33,7 +34,7 @@ ) as dag: # Run CSV transform within kubernetes pod - transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + transform_csv = kubernetes_pod.KubernetesPodOperator( task_id="transform_csv", name="311_service_requests", namespace="default", @@ -51,7 +52,7 @@ ) # Task to load CSV data to a BigQuery table - load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/new_york/311_service_requests/data_output.csv"], diff --git a/datasets/new_york/citibike_stations/citibike_stations_dag.py b/datasets/new_york/citibike_stations/citibike_stations_dag.py index 51e8502bf..93899ec27 100644 --- a/datasets/new_york/citibike_stations/citibike_stations_dag.py +++ b/datasets/new_york/citibike_stations/citibike_stations_dag.py @@ -14,7 +14,8 @@ from airflow import DAG -from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", @@ -33,7 +34,7 @@ ) as dag: # Run CSV transform within kubernetes pod - transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + transform_csv = kubernetes_pod.KubernetesPodOperator( task_id="transform_csv", name="citibike_stations", namespace="default", @@ -52,7 +53,7 @@ ) # Task to load CSV data to a BigQuery table - load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/new_york/citibike_stations/data_output.csv"], diff --git a/datasets/new_york/tree_census_1995/tree_census_1995_dag.py b/datasets/new_york/tree_census_1995/tree_census_1995_dag.py index 7183ead75..6a4a4d08e 100644 --- a/datasets/new_york/tree_census_1995/tree_census_1995_dag.py +++ b/datasets/new_york/tree_census_1995/tree_census_1995_dag.py @@ -14,7 +14,8 @@ from airflow import DAG -from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", @@ -33,7 +34,7 @@ ) as dag: # Run CSV transform within kubernetes pod - transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + transform_csv = kubernetes_pod.KubernetesPodOperator( task_id="transform_csv", name="tree_census_1995", namespace="default", @@ -51,7 +52,7 @@ ) # Task to load CSV data to a BigQuery table - load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/new_york/tree_census_1995/data_output.csv"],