diff --git a/datasets/world_bank_health_population/_images/run_csv_transform_kub/Dockerfile b/datasets/world_bank_health_population/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..45603b321 --- /dev/null +++ b/datasets/world_bank_health_population/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,42 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +RUN apt-get -y update && apt-get install -y apt-transport-https ca-certificates gnupg &&\ + echo "deb https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list &&\ + curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - &&\ + apt-get -y update && apt-get install -y google-cloud-sdk + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/world_bank_health_population/_images/run_csv_transform_kub/csv_transform.py b/datasets/world_bank_health_population/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..4ebcea00c --- /dev/null +++ b/datasets/world_bank_health_population/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,153 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import json +import logging +import math +import os +import pathlib +import subprocess +import typing + +import pandas as pd +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + column_name: str, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info( + f"World Bank Health Population {pipeline_name} process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file {source_url}") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}") + df = pd.read_csv(source_file, skip_blank_lines=True) + + logging.info(f"Transforming {source_file} ... ") + + logging.info(f"Transform: Dropping column {column_name} ...") + delete_column(df, column_name) + + logging.info(f"Transform: Renaming columns for {pipeline_name} ...") + rename_headers(df, rename_mappings) + + if pipeline_name == "series_times": + logging.info(f"Transform: Extracting year for {pipeline_name} ...") + df["year"] = df["year"].apply(extract_year) + else: + df = df + + if pipeline_name == "country_summary": + logging.info("Transform: Creating a new column ...") + df["latest_water_withdrawal_data"] = "" + + logging.info("Transform: Converting to integer ... ") + df["latest_industrial_data"] = df["latest_industrial_data"].apply( + convert_to_integer_string + ) + df["latest_trade_data"] = df["latest_trade_data"].apply( + convert_to_integer_string + ) + else: + df = df + + logging.info(f"Transform: Reordering headers for {pipeline_name} ...") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + f"World Bank Health Population {pipeline_name} process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + subprocess.check_call(["gsutil", "cp", f"{source_url}", f"{source_file}"]) + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def delete_column(df: pd.DataFrame, column_name: str) -> None: + df = df.drop(column_name, axis=1, inplace=True) + + +def extract_year(string_val: str) -> str: + # string_val example: YR2021 + return string_val[2:] + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, index=False) + + +def convert_to_integer_string(input: typing.Union[str, float]) -> str: + str_val = "" + if not input or (math.isnan(input)): + str_val = "" + else: + str_val = str(int(round(input, 0))) + return str_val + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + column_name=os.environ["COLUMN_TO_REMOVE"], + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/world_bank_health_population/_images/run_csv_transform_kub/requirements.txt b/datasets/world_bank_health_population/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..e2fabcc34 --- /dev/null +++ b/datasets/world_bank_health_population/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,2 @@ +google-cloud-storage +pandas diff --git a/datasets/world_bank_health_population/_terraform/country_series_definitions_pipeline.tf b/datasets/world_bank_health_population/_terraform/country_series_definitions_pipeline.tf new file mode 100644 index 000000000..73b061087 --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/country_series_definitions_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "country_series_definitions" { + project = var.project_id + dataset_id = "world_bank_health_population" + table_id = "country_series_definitions" + + description = "Country Series Definition table" + + + + + depends_on = [ + google_bigquery_dataset.world_bank_health_population + ] +} + +output "bigquery_table-country_series_definitions-table_id" { + value = google_bigquery_table.country_series_definitions.table_id +} + +output "bigquery_table-country_series_definitions-id" { + value = google_bigquery_table.country_series_definitions.id +} diff --git a/datasets/world_bank_health_population/_terraform/country_summary_pipeline.tf b/datasets/world_bank_health_population/_terraform/country_summary_pipeline.tf new file mode 100644 index 000000000..9edd1cfad --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/country_summary_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "country_summary" { + project = var.project_id + dataset_id = "world_bank_health_population" + table_id = "country_summary" + + description = "Country Summary table" + + + + + depends_on = [ + google_bigquery_dataset.world_bank_health_population + ] +} + +output "bigquery_table-country_summary-table_id" { + value = google_bigquery_table.country_summary.table_id +} + +output "bigquery_table-country_summary-id" { + value = google_bigquery_table.country_summary.id +} diff --git a/datasets/world_bank_health_population/_terraform/provider.tf b/datasets/world_bank_health_population/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/world_bank_health_population/_terraform/series_summary_pipeline.tf b/datasets/world_bank_health_population/_terraform/series_summary_pipeline.tf new file mode 100644 index 000000000..98f24abe9 --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/series_summary_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "series_summary" { + project = var.project_id + dataset_id = "world_bank_health_population" + table_id = "series_summary" + + description = "Series Summary table" + + + + + depends_on = [ + google_bigquery_dataset.world_bank_health_population + ] +} + +output "bigquery_table-series_summary-table_id" { + value = google_bigquery_table.series_summary.table_id +} + +output "bigquery_table-series_summary-id" { + value = google_bigquery_table.series_summary.id +} diff --git a/datasets/world_bank_health_population/_terraform/series_times_pipeline.tf b/datasets/world_bank_health_population/_terraform/series_times_pipeline.tf new file mode 100644 index 000000000..0aafd26ad --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/series_times_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "series_times" { + project = var.project_id + dataset_id = "world_bank_health_population" + table_id = "series_times" + + description = "Series Times table" + + + + + depends_on = [ + google_bigquery_dataset.world_bank_health_population + ] +} + +output "bigquery_table-series_times-table_id" { + value = google_bigquery_table.series_times.table_id +} + +output "bigquery_table-series_times-id" { + value = google_bigquery_table.series_times.id +} diff --git a/datasets/world_bank_health_population/_terraform/variables.tf b/datasets/world_bank_health_population/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/world_bank_health_population/_terraform/world_bank_health_population_dataset.tf b/datasets/world_bank_health_population/_terraform/world_bank_health_population_dataset.tf new file mode 100644 index 000000000..f3063a423 --- /dev/null +++ b/datasets/world_bank_health_population/_terraform/world_bank_health_population_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "world_bank_health_population" { + dataset_id = "world_bank_health_population" + project = var.project_id + description = "World Bank Health Population" +} + +output "bigquery_dataset-world_bank_health_population-dataset_id" { + value = google_bigquery_dataset.world_bank_health_population.dataset_id +} diff --git a/datasets/world_bank_health_population/country_series_definitions/country_series_definitions_dag.py b/datasets/world_bank_health_population/country_series_definitions/country_series_definitions_dag.py new file mode 100644 index 000000000..12cdd6ea2 --- /dev/null +++ b/datasets/world_bank_health_population/country_series_definitions/country_series_definitions_dag.py @@ -0,0 +1,92 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="world_bank_health_population.country_series_definitions", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + country_series_definitions_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="country_series_definitions_transform_csv", + startup_timeout_seconds=600, + name="country_series_definitions", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsCountry-Series.csv", + "SOURCE_FILE": "files/data.csv", + "COLUMN_TO_REMOVE": "Unnamed: 3", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/world_bank_health_population/country_series_definitions/data_output.csv", + "PIPELINE_NAME": "country_series_definitions", + "CSV_HEADERS": '["country_code" ,"series_code" ,"description"]', + "RENAME_MAPPINGS": '{"CountryCode":"country_code","SeriesCode":"series_code","DESCRIPTION":"description"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_country_series_definitions_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_country_series_definitions_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/world_bank_health_population/country_series_definitions/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="world_bank_health_population.country_series_definitions", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "country_code", "type": "string", "mode": "nullable"}, + {"name": "series_code", "type": "string", "mode": "nullable"}, + {"name": "description", "type": "string", "mode": "nullable"}, + ], + ) + + country_series_definitions_transform_csv >> load_country_series_definitions_to_bq diff --git a/datasets/world_bank_health_population/country_series_definitions/pipeline.yaml b/datasets/world_bank_health_population/country_series_definitions/pipeline.yaml new file mode 100644 index 000000000..ab67b0c73 --- /dev/null +++ b/datasets/world_bank_health_population/country_series_definitions/pipeline.yaml @@ -0,0 +1,94 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: country_series_definitions + description: "Country Series Definition table" + +dag: + airflow_version: 1 + initialize: + dag_id: country_series_definitions + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "country_series_definitions_transform_csv" + startup_timeout_seconds: 600 + name: "country_series_definitions" + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsCountry-Series.csv" + SOURCE_FILE: "files/data.csv" + COLUMN_TO_REMOVE: "Unnamed: 3" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/world_bank_health_population/country_series_definitions/data_output.csv" + PIPELINE_NAME: "country_series_definitions" + CSV_HEADERS: >- + ["country_code" ,"series_code" ,"description"] + RENAME_MAPPINGS: >- + {"CountryCode":"country_code","SeriesCode":"series_code","DESCRIPTION":"description"} + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_country_series_definitions_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/world_bank_health_population/country_series_definitions/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "world_bank_health_population.country_series_definitions" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "country_code" + type: "string" + mode: "nullable" + - name: "series_code" + type: "string" + mode: "nullable" + - name: "description" + type: "string" + mode: "nullable" + + graph_paths: + - "country_series_definitions_transform_csv >> load_country_series_definitions_to_bq" diff --git a/datasets/world_bank_health_population/country_summary/country_summary_dag.py b/datasets/world_bank_health_population/country_summary/country_summary_dag.py new file mode 100644 index 000000000..0eea30043 --- /dev/null +++ b/datasets/world_bank_health_population/country_summary/country_summary_dag.py @@ -0,0 +1,169 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="world_bank_health_population.country_summary", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + country_summary_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="country_summary_transform_csv", + startup_timeout_seconds=600, + name="country_summary", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsCountry.csv", + "SOURCE_FILE": "files/data.csv", + "COLUMN_TO_REMOVE": "Unnamed: 30", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/world_bank_health_population/country_summary/data_output.csv", + "PIPELINE_NAME": "country_summary", + "CSV_HEADERS": '["country_code","short_name","table_name","long_name","two_alpha_code","currency_unit","special_notes","region","income_group","wb_2_code","national_accounts_base_year","national_accounts_reference_year","sna_price_valuation","lending_category","other_groups","system_of_national_accounts","alternative_conversion_factor","ppp_survey_year","balance_of_payments_manual_in_use","external_debt_reporting_status","system_of_trade","government_accounting_concept","imf_data_dissemination_standard","latest_population_census","latest_household_survey","source_of_most_recent_income_and_expenditure_data","vital_registration_complete","latest_agricultural_census","latest_industrial_data","latest_trade_data","latest_water_withdrawal_data"]', + "RENAME_MAPPINGS": '{"Country Code":"country_code","Short Name":"short_name","Table Name":"table_name","Long Name":"long_name","2-alpha code":"two_alpha_code","Currency Unit":"currency_unit","Special Notes":"special_notes","Region":"region","Income Group":"income_group","WB-2 code":"wb_2_code","National accounts base year":"national_accounts_base_year","National accounts reference year":"national_accounts_reference_year","SNA price valuation":"sna_price_valuation","Lending category":"lending_category","Other groups":"other_groups","System of National Accounts":"system_of_national_accounts","Alternative conversion factor":"alternative_conversion_factor","PPP survey year":"ppp_survey_year","Balance of Payments Manual in use":"balance_of_payments_manual_in_use","External debt Reporting status":"external_debt_reporting_status","System of trade":"system_of_trade","Government Accounting concept":"government_accounting_concept","IMF data dissemination standard":"imf_data_dissemination_standard","Latest population census":"latest_population_census","Latest household survey":"latest_household_survey","Source of most recent Income and expenditure data":"source_of_most_recent_income_and_expenditure_data","Vital registration complete":"vital_registration_complete","Latest agricultural census":"latest_agricultural_census","Latest industrial data":"latest_industrial_data","Latest trade data":"latest_trade_data"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_country_summary_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_country_summary_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/world_bank_health_population/country_summary/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="world_bank_health_population.country_summary", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "country_code", "type": "string", "mode": "nullable"}, + {"name": "short_name", "type": "string", "mode": "nullable"}, + {"name": "table_name", "type": "string", "mode": "nullable"}, + {"name": "long_name", "type": "string", "mode": "nullable"}, + {"name": "two_alpha_code", "type": "string", "mode": "nullable"}, + {"name": "currency_unit", "type": "string", "mode": "nullable"}, + {"name": "special_notes", "type": "string", "mode": "nullable"}, + {"name": "region", "type": "string", "mode": "nullable"}, + {"name": "income_group", "type": "string", "mode": "nullable"}, + {"name": "wb_2_code", "type": "string", "mode": "nullable"}, + { + "name": "national_accounts_base_year", + "type": "string", + "mode": "nullable", + }, + { + "name": "national_accounts_reference_year", + "type": "string", + "mode": "nullable", + }, + {"name": "sna_price_valuation", "type": "string", "mode": "nullable"}, + {"name": "lending_category", "type": "string", "mode": "nullable"}, + {"name": "other_groups", "type": "string", "mode": "nullable"}, + { + "name": "system_of_national_accounts", + "type": "string", + "mode": "nullable", + }, + { + "name": "alternative_conversion_factor", + "type": "string", + "mode": "nullable", + }, + {"name": "ppp_survey_year", "type": "string", "mode": "nullable"}, + { + "name": "balance_of_payments_manual_in_use", + "type": "string", + "mode": "nullable", + }, + { + "name": "external_debt_reporting_status", + "type": "string", + "mode": "nullable", + }, + {"name": "system_of_trade", "type": "string", "mode": "nullable"}, + { + "name": "government_accounting_concept", + "type": "string", + "mode": "nullable", + }, + { + "name": "imf_data_dissemination_standard", + "type": "string", + "mode": "nullable", + }, + {"name": "latest_population_census", "type": "string", "mode": "nullable"}, + {"name": "latest_household_survey", "type": "string", "mode": "nullable"}, + { + "name": "source_of_most_recent_income_and_expenditure_data", + "type": "string", + "mode": "nullable", + }, + { + "name": "vital_registration_complete", + "type": "string", + "mode": "nullable", + }, + { + "name": "latest_agricultural_census", + "type": "string", + "mode": "nullable", + }, + {"name": "latest_industrial_data", "type": "integer", "mode": "nullable"}, + {"name": "latest_trade_data", "type": "integer", "mode": "nullable"}, + { + "name": "latest_water_withdrawal_data", + "type": "integer", + "mode": "nullable", + }, + ], + ) + + country_summary_transform_csv >> load_country_summary_to_bq diff --git a/datasets/world_bank_health_population/country_summary/pipeline.yaml b/datasets/world_bank_health_population/country_summary/pipeline.yaml new file mode 100644 index 000000000..b277c35cd --- /dev/null +++ b/datasets/world_bank_health_population/country_summary/pipeline.yaml @@ -0,0 +1,178 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: country_summary + description: "Country Summary table" + +dag: + airflow_version: 1 + initialize: + dag_id: country_summary + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "country_summary_transform_csv" + startup_timeout_seconds: 600 + name: "country_summary" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsCountry.csv" + SOURCE_FILE: "files/data.csv" + COLUMN_TO_REMOVE: "Unnamed: 30" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/world_bank_health_population/country_summary/data_output.csv" + PIPELINE_NAME: "country_summary" + CSV_HEADERS: >- + ["country_code","short_name","table_name","long_name","two_alpha_code","currency_unit","special_notes","region","income_group","wb_2_code","national_accounts_base_year","national_accounts_reference_year","sna_price_valuation","lending_category","other_groups","system_of_national_accounts","alternative_conversion_factor","ppp_survey_year","balance_of_payments_manual_in_use","external_debt_reporting_status","system_of_trade","government_accounting_concept","imf_data_dissemination_standard","latest_population_census","latest_household_survey","source_of_most_recent_income_and_expenditure_data","vital_registration_complete","latest_agricultural_census","latest_industrial_data","latest_trade_data","latest_water_withdrawal_data"] + RENAME_MAPPINGS: >- + {"Country Code":"country_code","Short Name":"short_name","Table Name":"table_name","Long Name":"long_name","2-alpha code":"two_alpha_code","Currency Unit":"currency_unit","Special Notes":"special_notes","Region":"region","Income Group":"income_group","WB-2 code":"wb_2_code","National accounts base year":"national_accounts_base_year","National accounts reference year":"national_accounts_reference_year","SNA price valuation":"sna_price_valuation","Lending category":"lending_category","Other groups":"other_groups","System of National Accounts":"system_of_national_accounts","Alternative conversion factor":"alternative_conversion_factor","PPP survey year":"ppp_survey_year","Balance of Payments Manual in use":"balance_of_payments_manual_in_use","External debt Reporting status":"external_debt_reporting_status","System of trade":"system_of_trade","Government Accounting concept":"government_accounting_concept","IMF data dissemination standard":"imf_data_dissemination_standard","Latest population census":"latest_population_census","Latest household survey":"latest_household_survey","Source of most recent Income and expenditure data":"source_of_most_recent_income_and_expenditure_data","Vital registration complete":"vital_registration_complete","Latest agricultural census":"latest_agricultural_census","Latest industrial data":"latest_industrial_data","Latest trade data":"latest_trade_data"} + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_country_summary_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/world_bank_health_population/country_summary/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "world_bank_health_population.country_summary" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "country_code" + type: "string" + mode: "nullable" + - name: "short_name" + type: "string" + mode: "nullable" + - name: "table_name" + type: "string" + mode: "nullable" + - name: "long_name" + type: "string" + mode: "nullable" + - name: "two_alpha_code" + type: "string" + mode: "nullable" + - name: "currency_unit" + type: "string" + mode: "nullable" + - name: "special_notes" + type: "string" + mode: "nullable" + - name: "region" + type: "string" + mode: "nullable" + - name: "income_group" + type: "string" + mode: "nullable" + - name: "wb_2_code" + type: "string" + mode: "nullable" + - name: "national_accounts_base_year" + type: "string" + mode: "nullable" + - name: "national_accounts_reference_year" + type: "string" + mode: "nullable" + - name: "sna_price_valuation" + type: "string" + mode: "nullable" + - name: "lending_category" + type: "string" + mode: "nullable" + - name: "other_groups" + type: "string" + mode: "nullable" + - name: "system_of_national_accounts" + type: "string" + mode: "nullable" + - name: "alternative_conversion_factor" + type: "string" + mode: "nullable" + - name: "ppp_survey_year" + type: "string" + mode: "nullable" + - name: "balance_of_payments_manual_in_use" + type: "string" + mode: "nullable" + - name: "external_debt_reporting_status" + type: "string" + mode: "nullable" + - name: "system_of_trade" + type: "string" + mode: "nullable" + - name: "government_accounting_concept" + type: "string" + mode: "nullable" + - name: "imf_data_dissemination_standard" + type: "string" + mode: "nullable" + - name: "latest_population_census" + type: "string" + mode: "nullable" + - name: "latest_household_survey" + type: "string" + mode: "nullable" + - name: "source_of_most_recent_income_and_expenditure_data" + type: "string" + mode: "nullable" + - name: "vital_registration_complete" + type: "string" + mode: "nullable" + - name: "latest_agricultural_census" + type: "string" + mode: "nullable" + - name: "latest_industrial_data" + type: "integer" + mode: "nullable" + - name: "latest_trade_data" + type: "integer" + mode: "nullable" + - name: "latest_water_withdrawal_data" + type: "integer" + mode: "nullable" + + graph_paths: + - "country_summary_transform_csv >> load_country_summary_to_bq" diff --git a/datasets/world_bank_health_population/dataset.yaml b/datasets/world_bank_health_population/dataset.yaml new file mode 100644 index 000000000..a7060816d --- /dev/null +++ b/datasets/world_bank_health_population/dataset.yaml @@ -0,0 +1,26 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: world_bank_health_population + friendly_name: world_bank_health_population + description: "World Bank Health Population" + dataset_sources: ~ + terms_of_use: ~ + + +resources: + - type: bigquery_dataset + dataset_id: world_bank_health_population + description: "World Bank Health Population" diff --git a/datasets/world_bank_health_population/series_summary/pipeline.yaml b/datasets/world_bank_health_population/series_summary/pipeline.yaml new file mode 100644 index 000000000..f1f5fd99e --- /dev/null +++ b/datasets/world_bank_health_population/series_summary/pipeline.yaml @@ -0,0 +1,145 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: series_summary + description: "Series Summary table" + +dag: + airflow_version: 1 + initialize: + dag_id: series_summary + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "series_summary_transform_csv" + startup_timeout_seconds: 600 + name: "series_summary" + namespace: "default" + image_pull_policy: "Always" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image: "{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsSeries.csv" + SOURCE_FILE: "files/data.csv" + COLUMN_TO_REMOVE: "Unnamed: 20" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/world_bank_health_population/series_summary/data_output.csv" + PIPELINE_NAME: "series_summary" + CSV_HEADERS: >- + ["series_code" ,"topic" ,"indicator_name" ,"short_definition" ,"long_definition" ,"unit_of_measure" ,"periodicity" ,"base_period" ,"other_notes" ,"aggregation_method" ,"limitations_and_exceptions" ,"notes_from_original_source" ,"general_comments" ,"source" ,"statistical_concept_and_methodology" ,"development_relevance" ,"related_source_links" ,"other_web_links" ,"related_indicators" ,"license_type"] + RENAME_MAPPINGS: >- + {"Series Code":"series_code" ,"Topic":"topic" ,"Indicator Name":"indicator_name" ,"Short definition":"short_definition" ,"Long definition":"long_definition" ,"Unit of measure":"unit_of_measure" ,"Periodicity":"periodicity" ,"Base Period":"base_period" ,"Other notes":"other_notes" ,"Aggregation method":"aggregation_method" ,"Limitations and exceptions":"limitations_and_exceptions" ,"Notes from original source":"notes_from_original_source" ,"General comments":"general_comments" ,"Source":"source" ,"Statistical concept and methodology":"statistical_concept_and_methodology" ,"Development relevance":"development_relevance" ,"Related source links":"related_source_links" ,"Other web links":"other_web_links" ,"Related indicators":"related_indicators" ,"License Type":"license_type"} + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_series_summary_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/world_bank_health_population/series_summary/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "world_bank_health_population.series_summary" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_code" + type: "string" + mode: "nullable" + - name: "topic" + type: "string" + mode: "nullable" + - name: "indicator_name" + type: "string" + mode: "nullable" + - name: "short_definition" + type: "string" + mode: "nullable" + - name: "long_definition" + type: "string" + mode: "nullable" + - name: "unit_of_measure" + type: "string" + mode: "nullable" + - name: "periodicity" + type: "string" + mode: "nullable" + - name: "base_period" + type: "integer" + mode: "nullable" + - name: "other_notes" + type: "string" + mode: "nullable" + - name: "aggregation_method" + type: "string" + mode: "nullable" + - name: "limitations_and_exceptions" + type: "string" + mode: "nullable" + - name: "notes_from_original_source" + type: "string" + mode: "nullable" + - name: "general_comments" + type: "string" + mode: "nullable" + - name: "source" + type: "string" + mode: "nullable" + - name: "statistical_concept_and_methodology" + type: "string" + mode: "nullable" + - name: "development_relevance" + type: "string" + mode: "nullable" + - name: "related_source_links" + type: "string" + mode: "nullable" + - name: "other_web_links" + type: "string" + mode: "nullable" + - name: "related_indicators" + type: "string" + mode: "nullable" + - name: "license_type" + type: "string" + mode: "nullable" + + graph_paths: + - "series_summary_transform_csv >> load_series_summary_to_bq" diff --git a/datasets/world_bank_health_population/series_summary/series_summary_dag.py b/datasets/world_bank_health_population/series_summary/series_summary_dag.py new file mode 100644 index 000000000..e2284a972 --- /dev/null +++ b/datasets/world_bank_health_population/series_summary/series_summary_dag.py @@ -0,0 +1,122 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="world_bank_health_population.series_summary", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + series_summary_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="series_summary_transform_csv", + startup_timeout_seconds=600, + name="series_summary", + namespace="default", + image_pull_policy="Always", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image="{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsSeries.csv", + "SOURCE_FILE": "files/data.csv", + "COLUMN_TO_REMOVE": "Unnamed: 20", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/world_bank_health_population/series_summary/data_output.csv", + "PIPELINE_NAME": "series_summary", + "CSV_HEADERS": '["series_code" ,"topic" ,"indicator_name" ,"short_definition" ,"long_definition" ,"unit_of_measure" ,"periodicity" ,"base_period" ,"other_notes" ,"aggregation_method" ,"limitations_and_exceptions" ,"notes_from_original_source" ,"general_comments" ,"source" ,"statistical_concept_and_methodology" ,"development_relevance" ,"related_source_links" ,"other_web_links" ,"related_indicators" ,"license_type"]', + "RENAME_MAPPINGS": '{"Series Code":"series_code" ,"Topic":"topic" ,"Indicator Name":"indicator_name" ,"Short definition":"short_definition" ,"Long definition":"long_definition" ,"Unit of measure":"unit_of_measure" ,"Periodicity":"periodicity" ,"Base Period":"base_period" ,"Other notes":"other_notes" ,"Aggregation method":"aggregation_method" ,"Limitations and exceptions":"limitations_and_exceptions" ,"Notes from original source":"notes_from_original_source" ,"General comments":"general_comments" ,"Source":"source" ,"Statistical concept and methodology":"statistical_concept_and_methodology" ,"Development relevance":"development_relevance" ,"Related source links":"related_source_links" ,"Other web links":"other_web_links" ,"Related indicators":"related_indicators" ,"License Type":"license_type"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_series_summary_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_series_summary_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/world_bank_health_population/series_summary/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="world_bank_health_population.series_summary", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_code", "type": "string", "mode": "nullable"}, + {"name": "topic", "type": "string", "mode": "nullable"}, + {"name": "indicator_name", "type": "string", "mode": "nullable"}, + {"name": "short_definition", "type": "string", "mode": "nullable"}, + {"name": "long_definition", "type": "string", "mode": "nullable"}, + {"name": "unit_of_measure", "type": "string", "mode": "nullable"}, + {"name": "periodicity", "type": "string", "mode": "nullable"}, + {"name": "base_period", "type": "integer", "mode": "nullable"}, + {"name": "other_notes", "type": "string", "mode": "nullable"}, + {"name": "aggregation_method", "type": "string", "mode": "nullable"}, + { + "name": "limitations_and_exceptions", + "type": "string", + "mode": "nullable", + }, + { + "name": "notes_from_original_source", + "type": "string", + "mode": "nullable", + }, + {"name": "general_comments", "type": "string", "mode": "nullable"}, + {"name": "source", "type": "string", "mode": "nullable"}, + { + "name": "statistical_concept_and_methodology", + "type": "string", + "mode": "nullable", + }, + {"name": "development_relevance", "type": "string", "mode": "nullable"}, + {"name": "related_source_links", "type": "string", "mode": "nullable"}, + {"name": "other_web_links", "type": "string", "mode": "nullable"}, + {"name": "related_indicators", "type": "string", "mode": "nullable"}, + {"name": "license_type", "type": "string", "mode": "nullable"}, + ], + ) + + series_summary_transform_csv >> load_series_summary_to_bq diff --git a/datasets/world_bank_health_population/series_times/pipeline.yaml b/datasets/world_bank_health_population/series_times/pipeline.yaml new file mode 100644 index 000000000..92eb726c5 --- /dev/null +++ b/datasets/world_bank_health_population/series_times/pipeline.yaml @@ -0,0 +1,93 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: series_times + description: "Series Times table" + +dag: + airflow_version: 1 + initialize: + dag_id: series_times + default_args: + owner: "Google" + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "series_times_transform_csv" + startup_timeout_seconds: 600 + name: "series_times" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URL: "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsSeries-Time.csv" + SOURCE_FILE: "files/data.csv" + COLUMN_TO_REMOVE: "Unnamed: 3" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/world_bank_health_population/series_times/data_output.csv" + PIPELINE_NAME: "series_times" + CSV_HEADERS: >- + ["series_code","year","description"] + RENAME_MAPPINGS: >- + {"SeriesCode" : "series_code" ,"Year" : "year" ,"DESCRIPTION" : "description"} + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_series_times_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/world_bank_health_population/series_times/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "world_bank_health_population.series_times" + skip_leading_rows: 1 + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_code" + type: "string" + mode: "nullable" + - name: "year" + type: "integer" + mode: "nullable" + - name: "description" + type: "string" + mode: "nullable" + + graph_paths: + - "series_times_transform_csv >> load_series_times_to_bq" diff --git a/datasets/world_bank_health_population/series_times/series_times_dag.py b/datasets/world_bank_health_population/series_times/series_times_dag.py new file mode 100644 index 000000000..b5ead5dc1 --- /dev/null +++ b/datasets/world_bank_health_population/series_times/series_times_dag.py @@ -0,0 +1,92 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="world_bank_health_population.series_times", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + series_times_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="series_times_transform_csv", + startup_timeout_seconds=600, + name="series_times", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.world_bank_health_population.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "gs://pdp-feeds-staging/RelayWorldBank/hnp_stats_csv/HNP_StatsSeries-Time.csv", + "SOURCE_FILE": "files/data.csv", + "COLUMN_TO_REMOVE": "Unnamed: 3", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/world_bank_health_population/series_times/data_output.csv", + "PIPELINE_NAME": "series_times", + "CSV_HEADERS": '["series_code","year","description"]', + "RENAME_MAPPINGS": '{"SeriesCode" : "series_code" ,"Year" : "year" ,"DESCRIPTION" : "description"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_series_times_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_series_times_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/world_bank_health_population/series_times/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="world_bank_health_population.series_times", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_code", "type": "string", "mode": "nullable"}, + {"name": "year", "type": "integer", "mode": "nullable"}, + {"name": "description", "type": "string", "mode": "nullable"}, + ], + ) + + series_times_transform_csv >> load_series_times_to_bq