diff --git a/datasets/noaa/_terraform/gsod_stations_pipeline.tf b/datasets/noaa/_terraform/gsod_stations_pipeline.tf index 2b44b6b4b..64ac502e9 100644 --- a/datasets/noaa/_terraform/gsod_stations_pipeline.tf +++ b/datasets/noaa/_terraform/gsod_stations_pipeline.tf @@ -15,7 +15,7 @@ */ -resource "google_bigquery_table" "gsod_stations" { +resource "google_bigquery_table" "noaa_gsod_stations" { project = var.project_id dataset_id = "noaa" table_id = "gsod_stations" @@ -30,10 +30,10 @@ resource "google_bigquery_table" "gsod_stations" { ] } -output "bigquery_table-gsod_stations-table_id" { - value = google_bigquery_table.gsod_stations.table_id +output "bigquery_table-noaa_gsod_stations-table_id" { + value = google_bigquery_table.noaa_gsod_stations.table_id } -output "bigquery_table-gsod_stations-id" { - value = google_bigquery_table.gsod_stations.id +output "bigquery_table-noaa_gsod_stations-id" { + value = google_bigquery_table.noaa_gsod_stations.id } diff --git a/datasets/noaa/_terraform/lightning_strikes_by_year_pipeline.tf b/datasets/noaa/_terraform/lightning_strikes_by_year_pipeline.tf deleted file mode 100644 index e24620ca8..000000000 --- a/datasets/noaa/_terraform/lightning_strikes_by_year_pipeline.tf +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Copyright 2021 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -resource "google_bigquery_table" "lightning_strikes_by_year" { - project = var.project_id - dataset_id = "noaa" - table_id = "lightning_strikes_by_year" - - description = "noaaspc" - - - - - depends_on = [ - google_bigquery_dataset.noaa - ] -} - -output "bigquery_table-lightning_strikes_by_year-table_id" { - value = google_bigquery_table.lightning_strikes_by_year.table_id -} - -output "bigquery_table-lightning_strikes_by_year-id" { - value = google_bigquery_table.lightning_strikes_by_year.id -} diff --git a/datasets/noaa/gsod_stations/gsod_stations_dag.py b/datasets/noaa/gsod_stations/gsod_stations_dag.py index cdf7777d8..4af9a0dd7 100644 --- a/datasets/noaa/gsod_stations/gsod_stations_dag.py +++ b/datasets/noaa/gsod_stations/gsod_stations_dag.py @@ -14,12 +14,13 @@ from airflow import DAG -from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery default_args = { "owner": "Google", "depends_on_past": False, - "start_date": "2021-03-01", + "start_date": "2021-12-30", } @@ -27,13 +28,13 @@ dag_id="noaa.gsod_stations", default_args=default_args, max_active_runs=1, - schedule_interval="@daily", + schedule_interval="@yearly", catchup=False, default_view="graph", ) as dag: # Run CSV transform within kubernetes pod - transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + transform_csv = kubernetes_pod.KubernetesPodOperator( task_id="transform_csv", name="gsod_stations", namespace="default", @@ -70,7 +71,7 @@ ) # Task to load CSV data to a BigQuery table - load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", bucket="{{ var.value.composer_bucket }}", source_objects=["data/noaa/gsod_stations/data_output.csv"], diff --git a/datasets/noaa/lightning_strikes_by_year/lightning_strikes_by_year_dag.py b/datasets/noaa/lightning_strikes_by_year/lightning_strikes_by_year_dag.py deleted file mode 100644 index b113ed4c5..000000000 --- a/datasets/noaa/lightning_strikes_by_year/lightning_strikes_by_year_dag.py +++ /dev/null @@ -1,90 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from airflow import DAG -from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator - -default_args = { - "owner": "Google", - "depends_on_past": False, - "start_date": "2021-03-01", -} - - -with DAG( - dag_id="noaa.lightning_strikes_by_year", - default_args=default_args, - max_active_runs=1, - schedule_interval="@daily", - catchup=False, - default_view="graph", -) as dag: - - # Run CSV transform within kubernetes pod - transform_csv = kubernetes_pod_operator.KubernetesPodOperator( - task_id="transform_csv", - name="lightning_strikes_by_year", - namespace="default", - affinity={ - "nodeAffinity": { - "requiredDuringSchedulingIgnoredDuringExecution": { - "nodeSelectorTerms": [ - { - "matchExpressions": [ - { - "key": "cloud.google.com/gke-nodepool", - "operator": "In", - "values": ["pool-e2-standard-4"], - } - ] - } - ] - } - } - }, - image_pull_policy="Always", - image="{{ var.json.noaa_lightning_strikes_by_year.container_registry.run_csv_transform_kub_lightning_strikes_by_year }}", - env_vars={ - "SOURCE_URL": "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-{{ macros.ds_format(macros.ds_add(ds, -365), '%Y-%m-%d', '%Y') }}.csv.gz", - "SOURCE_FILE": "files/data.csv", - "TARGET_FILE": "files/data_output.csv", - "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "data/noaa/lightning_strikes_by_year/data_output.csv", - }, - resources={"limit_memory": "2G", "limit_cpu": "1"}, - ) - - # Task to load CSV data to a BigQuery table - load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( - task_id="load_to_bq", - bucket="{{ var.value.composer_bucket }}", - source_objects=["data/noaa/lightning_strikes_by_year/data_output.csv"], - source_format="CSV", - destination_project_dataset_table="noaa.lightning_strikes_{{ macros.ds_format(macros.ds_add(ds, -365), \u0027%Y-%m-%d\u0027, \u0027%Y\u0027) }}", - skip_leading_rows=1, - write_disposition="WRITE_TRUNCATE", - schema_fields=[ - {"name": "date", "type": "TIMESTAMP", "mode": "NULLABLE"}, - {"name": "number_of_strikes", "type": "INTEGER", "mode": "NULLABLE"}, - { - "name": "center_point_geom", - "type": "GEOGRAPHY", - "description": "Center point of 0.10-degree tiles (roughly 1.1km) that aggregate strikes within the given tile.", - "mode": "NULLABLE", - }, - ], - ) - - transform_csv >> load_to_bq diff --git a/datasets/noaa/lightning_strikes_by_year/pipeline.yaml b/datasets/noaa/lightning_strikes_by_year/pipeline.yaml deleted file mode 100644 index 13b657583..000000000 --- a/datasets/noaa/lightning_strikes_by_year/pipeline.yaml +++ /dev/null @@ -1,125 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - ---- -resources: - - - type: bigquery_table - # Required Properties: - table_id: lightning_strikes_by_year - - # Description of the table - description: "noaaspc" - -dag: - airflow_version: 2 - initialize: - dag_id: lightning_strikes_by_year - default_args: - owner: "Google" - - # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded - depends_on_past: False - start_date: '2021-12-30' - max_active_runs: 1 - schedule_interval: "@yearly" # run once a week at Sunday 12am - catchup: False - default_view: graph - - tasks: - - - operator: "KubernetesPodOperator" - - # Task description - description: "Run CSV transform within kubernetes pod" - - args: - - task_id: "transform_csv" - - # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id - name: "lightning_strikes_by_year" - - # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. - namespace: "default" - - affinity: - nodeAffinity: - requiredDuringSchedulingIgnoredDuringExecution: - nodeSelectorTerms: - - matchExpressions: - - key: cloud.google.com/gke-nodepool - operator: In - values: - - "pool-e2-standard-4" - - image_pull_policy: "Always" - - # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. - image: "{{ var.json.noaa_lightning_strikes_by_year.container_registry.run_csv_transform_kub_lightning_strikes_by_year }}" - - # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. - env_vars: - SOURCE_URL: "https://www1.ncdc.noaa.gov/pub/data/swdi/database-csv/v2/nldn-tiles-{{ macros.ds_format(macros.ds_add(ds, -365), '%Y-%m-%d', '%Y') }}.csv.gz" - SOURCE_FILE: "files/data.csv" - TARGET_FILE: "files/data_output.csv" - TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "data/noaa/lightning_strikes_by_year/data_output.csv" - - - # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes - resources: - limit_memory: "2G" - limit_cpu: "1" - - - operator: "GoogleCloudStorageToBigQueryOperator" - description: "Task to load CSV data to a BigQuery table" - - args: - task_id: "load_to_bq" - - # The GCS bucket where the CSV file is located in. - bucket: "{{ var.value.composer_bucket }}" - - # The GCS object path for the CSV file - source_objects: ["data/noaa/lightning_strikes_by_year/data_output.csv"] - source_format: "CSV" - destination_project_dataset_table: "noaa.lightning_strikes_{{ macros.ds_format(macros.ds_add(ds, -365), '%Y-%m-%d', '%Y') }}" - - # Use this if your CSV file contains a header row - skip_leading_rows: 1 - - # How to write data to the table: overwrite, append, or write if empty - # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition - write_disposition: "WRITE_TRUNCATE" - - # The BigQuery table schema based on the CSV file. For more info, see - # https://cloud.google.com/bigquery/docs/schemas. - # Always use snake_case and lowercase for column names, and be explicit, - # i.e. specify modes for all columns. - # types: "INTEGER", "TIMESTAMP", "STRING" - schema_fields: - - name: "date" - type: "TIMESTAMP" - mode: "NULLABLE" - - name: "number_of_strikes" - type: "INTEGER" - mode: "NULLABLE" - - name: "center_point_geom" - type: "GEOGRAPHY" - description: "Center point of 0.10-degree tiles (roughly 1.1km) that aggregate strikes within the given tile." - mode: "NULLABLE" - - graph_paths: - - transform_csv >> load_to_bq