diff --git a/datasets/bls/_images/run_csv_transform_kub/Dockerfile b/datasets/bls/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..df990c1a5 --- /dev/null +++ b/datasets/bls/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,43 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +RUN apt-get -y update && apt-get install -y apt-transport-https ca-certificates gnupg &&\ + echo "deb https://packages.cloud.google.com/apt cloud-sdk main" | tee -a /etc/apt/sources.list.d/google-cloud-sdk.list &&\ + curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add - &&\ + apt-get -y update && apt-get install -y google-cloud-sdk + + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/bls/_images/run_csv_transform_kub/csv_transform.py b/datasets/bls/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..57799c08a --- /dev/null +++ b/datasets/bls/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,141 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import os +import pathlib +import re +import subprocess +import typing + +import pandas as pd +from google.cloud import storage + + +def main( + source_urls: typing.List[str], + source_files: typing.List[pathlib.Path], + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + pipeline_name: str, + joining_key: str, + columns: typing.List[str], +) -> None: + + logging.info( + f"BLS {pipeline_name} process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info("Downloading file...") + download_file(source_urls, source_files) + + logging.info("Reading the file(s)....") + df = read_files(source_files, joining_key) + + logging.info("Transform: Removing whitespace from headers names...") + df.columns = df.columns.str.strip() + + logging.info("Transform: Trim Whitespaces...") + trim_white_spaces(df, columns) + + if pipeline_name == "unemployment_cps": + logging.info("Transform: Replacing values...") + df["value"] = df["value"].apply(reg_exp_tranformation, args=(r"^(\-)$", "")) + + logging.info("Transform: Reordering headers..") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + logging.info("..Done!") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + f"BLS {pipeline_name} process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def save_to_new_file(df: pd.DataFrame, file_path: pathlib.Path) -> None: + df.to_csv(file_path, index=False) + + +def download_file( + source_urls: typing.List[str], source_files: typing.List[pathlib.Path] +) -> None: + for url, file in zip(source_urls, source_files): + logging.info(f"Downloading file from {url} ...") + subprocess.check_call(["gsutil", "cp", f"{url}", f"{file}"]) + + +def read_files(source_files, joining_key): + df = pd.DataFrame() + for source_file in source_files: + if os.path.splitext(source_file)[1] == ".csv": + _df = pd.read_csv(source_file) + else: + _df = pd.read_csv(source_file, sep="\t") + + if df.empty: + df = _df + else: + df = pd.merge(df, _df, how="left", on=joining_key) + return df + + +def trim_white_spaces(df: pd.DataFrame, columns: typing.List[str]) -> None: + for col in columns: + df[col] = df[col].astype(str).str.strip() + + +def reg_exp_tranformation(str_value: str, search_pattern: str, replace_val: str) -> str: + return re.sub(search_pattern, replace_val, str_value) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_urls=json.loads(os.environ["SOURCE_URLS"]), + source_files=json.loads(os.environ["SOURCE_FILES"]), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + joining_key=os.environ["JOINING_KEY"], + columns=json.loads(os.environ["TRIM_SPACE"]), + ) diff --git a/datasets/bls/_images/run_csv_transform_kub/requirements.txt b/datasets/bls/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..a13f29317 --- /dev/null +++ b/datasets/bls/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,2 @@ +pandas +google-cloud-storage diff --git a/datasets/bls/_terraform/c_cpi_u_pipeline.tf b/datasets/bls/_terraform/c_cpi_u_pipeline.tf new file mode 100644 index 000000000..4a29916ab --- /dev/null +++ b/datasets/bls/_terraform/c_cpi_u_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "c_cpi_u" { + project = var.project_id + dataset_id = "bls" + table_id = "c_cpi_u" + + description = "C_CPI_U Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-c_cpi_u-table_id" { + value = google_bigquery_table.c_cpi_u.table_id +} + +output "bigquery_table-c_cpi_u-id" { + value = google_bigquery_table.c_cpi_u.id +} diff --git a/datasets/bls/_terraform/cpi_u_pipeline.tf b/datasets/bls/_terraform/cpi_u_pipeline.tf new file mode 100644 index 000000000..3243e03d2 --- /dev/null +++ b/datasets/bls/_terraform/cpi_u_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "cpi_u" { + project = var.project_id + dataset_id = "bls" + table_id = "cpi_u" + + description = "CPI_U Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-cpi_u-table_id" { + value = google_bigquery_table.cpi_u.table_id +} + +output "bigquery_table-cpi_u-id" { + value = google_bigquery_table.cpi_u.id +} diff --git a/datasets/bls/_terraform/employment_hours_earnings_pipeline.tf b/datasets/bls/_terraform/employment_hours_earnings_pipeline.tf new file mode 100644 index 000000000..8dda55cdb --- /dev/null +++ b/datasets/bls/_terraform/employment_hours_earnings_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "employment_hours_earnings" { + project = var.project_id + dataset_id = "bls" + table_id = "employment_hours_earnings" + + description = "Employment_Hours_Earnings Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-employment_hours_earnings-table_id" { + value = google_bigquery_table.employment_hours_earnings.table_id +} + +output "bigquery_table-employment_hours_earnings-id" { + value = google_bigquery_table.employment_hours_earnings.id +} diff --git a/datasets/bls/_terraform/employment_hours_earnings_series_pipeline.tf b/datasets/bls/_terraform/employment_hours_earnings_series_pipeline.tf new file mode 100644 index 000000000..0b29a0a66 --- /dev/null +++ b/datasets/bls/_terraform/employment_hours_earnings_series_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "employment_hours_earnings_series" { + project = var.project_id + dataset_id = "bls" + table_id = "employment_hours_earnings_series" + + description = "Employment_Hours_Earnings_Series Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-employment_hours_earnings_series-table_id" { + value = google_bigquery_table.employment_hours_earnings_series.table_id +} + +output "bigquery_table-employment_hours_earnings_series-id" { + value = google_bigquery_table.employment_hours_earnings_series.id +} diff --git a/datasets/bls/_terraform/unemployment_cps_pipeline.tf b/datasets/bls/_terraform/unemployment_cps_pipeline.tf new file mode 100644 index 000000000..61c9e19b8 --- /dev/null +++ b/datasets/bls/_terraform/unemployment_cps_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "unemployment_cps" { + project = var.project_id + dataset_id = "bls" + table_id = "unemployment_cps" + + description = "Unemployment_CPS Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-unemployment_cps-table_id" { + value = google_bigquery_table.unemployment_cps.table_id +} + +output "bigquery_table-unemployment_cps-id" { + value = google_bigquery_table.unemployment_cps.id +} diff --git a/datasets/bls/_terraform/unemployment_cps_series_pipeline.tf b/datasets/bls/_terraform/unemployment_cps_series_pipeline.tf new file mode 100644 index 000000000..22f8561fc --- /dev/null +++ b/datasets/bls/_terraform/unemployment_cps_series_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "unemployment_cps_series" { + project = var.project_id + dataset_id = "bls" + table_id = "unemployment_cps_series" + + description = "Unemployment_CPS_Series Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-unemployment_cps_series-table_id" { + value = google_bigquery_table.unemployment_cps_series.table_id +} + +output "bigquery_table-unemployment_cps_series-id" { + value = google_bigquery_table.unemployment_cps_series.id +} diff --git a/datasets/bls/_terraform/wm_pipeline.tf b/datasets/bls/_terraform/wm_pipeline.tf new file mode 100644 index 000000000..31209ba1f --- /dev/null +++ b/datasets/bls/_terraform/wm_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "wm" { + project = var.project_id + dataset_id = "bls" + table_id = "wm" + + description = "WM Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-wm-table_id" { + value = google_bigquery_table.wm.table_id +} + +output "bigquery_table-wm-id" { + value = google_bigquery_table.wm.id +} diff --git a/datasets/bls/_terraform/wm_series_pipeline.tf b/datasets/bls/_terraform/wm_series_pipeline.tf new file mode 100644 index 000000000..81367e1f8 --- /dev/null +++ b/datasets/bls/_terraform/wm_series_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "wm_series" { + project = var.project_id + dataset_id = "bls" + table_id = "wm_series" + + description = "WM_Series Dataset" + + + + + depends_on = [ + google_bigquery_dataset.bls + ] +} + +output "bigquery_table-wm_series-table_id" { + value = google_bigquery_table.wm_series.table_id +} + +output "bigquery_table-wm_series-id" { + value = google_bigquery_table.wm_series.id +} diff --git a/datasets/bls/c_cpi_u/c_cpi_u_dag.py b/datasets/bls/c_cpi_u/c_cpi_u_dag.py new file mode 100644 index 000000000..80e61c54d --- /dev/null +++ b/datasets/bls/c_cpi_u/c_cpi_u_dag.py @@ -0,0 +1,102 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.c_cpi_u", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="c_cpi_u", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/inflat_listarea_area_join.csv","gs://pdp-feeds-staging/Bureau/cu.item.tsv"]', + "SOURCE_FILES": '["files/data1.csv","files/data2.tsv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/c_cpi_u/data_output.csv", + "PIPELINE_NAME": "c_cpi_u", + "FILE_PATH": "files/", + "JOINING_KEY": "item_code", + "TRIM_SPACE": '["series_id","value","footnote_codes","item_code"]', + "CSV_HEADERS": '["series_id","year","period","value","footnote_codes","survey_abbreviation","seasonal_code","periodicity_code","area_code","area_name","item_code","item_name","date"]', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/c_cpi_u/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.c_cpi_u", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "survey_abbreviation", "type": "STRING", "mode": "NULLABLE"}, + {"name": "seasonal_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "periodicity_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "area_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "area_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "item_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "item_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date", "type": "DATE", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/c_cpi_u/pipeline.yaml b/datasets/bls/c_cpi_u/pipeline.yaml new file mode 100644 index 000000000..205a2bf26 --- /dev/null +++ b/datasets/bls/c_cpi_u/pipeline.yaml @@ -0,0 +1,130 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: c_cpi_u + description: "C_CPI_U Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "c_cpi_u" + default_args: + owner: "Google" + + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "c_cpi_u" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/inflat_listarea_area_join.csv","gs://pdp-feeds-staging/Bureau/cu.item.tsv"] + SOURCE_FILES: >- + ["files/data1.csv","files/data2.tsv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/c_cpi_u/data_output.csv" + PIPELINE_NAME: "c_cpi_u" + FILE_PATH: "files/" + JOINING_KEY: "item_code" + TRIM_SPACE: >- + ["series_id","value","footnote_codes","item_code"] + CSV_HEADERS: >- + ["series_id","year","period","value","footnote_codes","survey_abbreviation","seasonal_code","periodicity_code","area_code","area_name","item_code","item_name","date"] + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/c_cpi_u/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.c_cpi_u" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "year" + type: "INTEGER" + mode: "NULLABLE" + - name: "period" + type: "STRING" + mode: "NULLABLE" + - name: "value" + type: "FLOAT" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "survey_abbreviation" + type: "STRING" + mode: "NULLABLE" + - name: "seasonal_code" + type: "STRING" + mode: "NULLABLE" + - name: "periodicity_code" + type: "STRING" + mode: "NULLABLE" + - name: "area_code" + type: "STRING" + mode: "NULLABLE" + - name: "area_name" + type: "STRING" + mode: "NULLABLE" + - name: "item_code" + type: "STRING" + mode: "NULLABLE" + - name: "item_name" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/cpi_u/cpi_u_dag.py b/datasets/bls/cpi_u/cpi_u_dag.py new file mode 100644 index 000000000..a93c3053c --- /dev/null +++ b/datasets/bls/cpi_u/cpi_u_dag.py @@ -0,0 +1,101 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.cpi_u", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="cpi_u", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/inflat-prices_listarea_area_join.csv","gs://pdp-feeds-staging/Bureau/cu.item.tsv"]', + "SOURCE_FILES": '["files/data1.csv","files/data2.tsv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/cpi_u/data_output.csv", + "PIPELINE_NAME": "cpi_u", + "JOINING_KEY": "item_code", + "TRIM_SPACE": '["series_id","value","footnote_codes"]', + "CSV_HEADERS": '["series_id","year","period","value","footnote_codes","survey_abbreviation","seasonal_code","periodicity_code","area_code","area_name","item_code","item_name","date"]', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/cpi_u/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.cpi_u", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "survey_abbreviation", "type": "STRING", "mode": "NULLABLE"}, + {"name": "seasonal_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "periodicity_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "area_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "area_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "item_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "item_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date", "type": "DATE", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/cpi_u/pipeline.yaml b/datasets/bls/cpi_u/pipeline.yaml new file mode 100644 index 000000000..9de4b6ab8 --- /dev/null +++ b/datasets/bls/cpi_u/pipeline.yaml @@ -0,0 +1,126 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: cpi_u + description: "CPI_U Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "cpi_u" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "cpi_u" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/inflat-prices_listarea_area_join.csv","gs://pdp-feeds-staging/Bureau/cu.item.tsv"] + SOURCE_FILES: >- + ["files/data1.csv","files/data2.tsv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/cpi_u/data_output.csv" + PIPELINE_NAME: "cpi_u" + JOINING_KEY: "item_code" + TRIM_SPACE: >- + ["series_id","value","footnote_codes"] + CSV_HEADERS: >- + ["series_id","year","period","value","footnote_codes","survey_abbreviation","seasonal_code","periodicity_code","area_code","area_name","item_code","item_name","date"] + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/cpi_u/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.cpi_u" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "year" + type: "INTEGER" + mode: "NULLABLE" + - name: "period" + type: "STRING" + mode: "NULLABLE" + - name: "value" + type: "FLOAT" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "survey_abbreviation" + type: "STRING" + mode: "NULLABLE" + - name: "seasonal_code" + type: "STRING" + mode: "NULLABLE" + - name: "periodicity_code" + type: "STRING" + mode: "NULLABLE" + - name: "area_code" + type: "STRING" + mode: "NULLABLE" + - name: "area_name" + type: "STRING" + mode: "NULLABLE" + - name: "item_code" + type: "STRING" + mode: "NULLABLE" + - name: "item_name" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/employment_hours_earnings/employment_hours_earnings_dag.py b/datasets/bls/employment_hours_earnings/employment_hours_earnings_dag.py new file mode 100644 index 000000000..89ee3e0f8 --- /dev/null +++ b/datasets/bls/employment_hours_earnings/employment_hours_earnings_dag.py @@ -0,0 +1,96 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.employment_hours_earnings", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="employment_hours_earnings", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/employment_list_join.csv","gs://pdp-feeds-staging/Bureau_new/ce.csv"]', + "SOURCE_FILES": '["files/data1.csv","files/data2.csv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/employment_hours_earnings/data_output.csv", + "PIPELINE_NAME": "employment_hours_earnings", + "JOINING_KEY": "series_id", + "FILE_PATH": "files/", + "TRIM_SPACE": '["series_id","value","footnote_codes","series_title"]', + "CSV_HEADERS": '["series_id","year","period","value","footnote_codes","date","series_title"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/employment_hours_earnings/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.employment_hours_earnings", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date", "type": "DATE", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/employment_hours_earnings/pipeline.yaml b/datasets/bls/employment_hours_earnings/pipeline.yaml new file mode 100644 index 000000000..7e210d84a --- /dev/null +++ b/datasets/bls/employment_hours_earnings/pipeline.yaml @@ -0,0 +1,109 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: employment_hours_earnings + description: "Employment_Hours_Earnings Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "employment_hours_earnings" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "employment_hours_earnings" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/employment_list_join.csv","gs://pdp-feeds-staging/Bureau_new/ce.csv"] + SOURCE_FILES: >- + ["files/data1.csv","files/data2.csv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/employment_hours_earnings/data_output.csv" + PIPELINE_NAME: "employment_hours_earnings" + JOINING_KEY: "series_id" + FILE_PATH: "files/" + TRIM_SPACE: >- + ["series_id","value","footnote_codes","series_title"] + CSV_HEADERS: >- + ["series_id","year","period","value","footnote_codes","date","series_title"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/employment_hours_earnings/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.employment_hours_earnings" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "year" + type: "INTEGER" + mode: "NULLABLE" + - name: "period" + type: "STRING" + mode: "NULLABLE" + - name: "value" + type: "FLOAT" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/employment_hours_earnings_series/employment_hours_earnings_series_dag.py b/datasets/bls/employment_hours_earnings_series/employment_hours_earnings_series_dag.py new file mode 100644 index 000000000..074e02e15 --- /dev/null +++ b/datasets/bls/employment_hours_earnings_series/employment_hours_earnings_series_dag.py @@ -0,0 +1,99 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.employment_hours_earnings_series", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="employment_hours_earnings_series", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/ce.series.tsv"]', + "SOURCE_FILES": '["files/data1.tsv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/employment_hours_earnings_series/data_output.csv", + "PIPELINE_NAME": "employment_hours_earnings_series", + "JOINING_KEY": "", + "TRIM_SPACE": '["series_id","footnote_codes"]', + "CSV_HEADERS": '["series_id","supersector_code","industry_code","data_type_code","seasonal","series_title","footnote_codes","begin_year","begin_period","end_year","end_period"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/employment_hours_earnings_series/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.employment_hours_earnings_series", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "supersector_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "industry_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "data_type_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "seasonal", "type": "STRING", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "begin_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "begin_period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "end_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "end_period", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/employment_hours_earnings_series/pipeline.yaml b/datasets/bls/employment_hours_earnings_series/pipeline.yaml new file mode 100644 index 000000000..af1e7a180 --- /dev/null +++ b/datasets/bls/employment_hours_earnings_series/pipeline.yaml @@ -0,0 +1,121 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: employment_hours_earnings_series + description: "Employment_Hours_Earnings_Series Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "employment_hours_earnings_series" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "employment_hours_earnings_series" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/ce.series.tsv"] + SOURCE_FILES: >- + ["files/data1.tsv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/employment_hours_earnings_series/data_output.csv" + PIPELINE_NAME: "employment_hours_earnings_series" + JOINING_KEY: "" + TRIM_SPACE: >- + ["series_id","footnote_codes"] + CSV_HEADERS: >- + ["series_id","supersector_code","industry_code","data_type_code","seasonal","series_title","footnote_codes","begin_year","begin_period","end_year","end_period"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/employment_hours_earnings_series/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.employment_hours_earnings_series" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "supersector_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "industry_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "data_type_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "seasonal" + type: "STRING" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "begin_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "begin_period" + type: "STRING" + mode: "NULLABLE" + - name: "end_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "end_period" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/unemployment_cps/pipeline.yaml b/datasets/bls/unemployment_cps/pipeline.yaml new file mode 100644 index 000000000..e9f81db1a --- /dev/null +++ b/datasets/bls/unemployment_cps/pipeline.yaml @@ -0,0 +1,108 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: unemployment_cps + description: "Unemployment_CPS Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "unemployment_cps" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "unemployment_cps" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/unemployment_list_join.csv","gs://pdp-feeds-staging/Bureau_new/ln.csv"] + SOURCE_FILES: >- + ["files/data1.csv","files/data2.csv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/unemployment_cps/data_output.csv" + PIPELINE_NAME: "unemployment_cps" + JOINING_KEY: "series_id" + TRIM_SPACE: >- + ["series_id","value","footnote_codes","series_title"] + CSV_HEADERS: >- + ["series_id","year","period","value","footnote_codes","date","series_title"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/unemployment_cps/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.unemployment_cps" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "year" + type: "INTEGER" + mode: "NULLABLE" + - name: "period" + type: "STRING" + mode: "NULLABLE" + - name: "value" + type: "FLOAT" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/unemployment_cps/unemployment_cps_dag.py b/datasets/bls/unemployment_cps/unemployment_cps_dag.py new file mode 100644 index 000000000..3651311e1 --- /dev/null +++ b/datasets/bls/unemployment_cps/unemployment_cps_dag.py @@ -0,0 +1,95 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.unemployment_cps", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="unemployment_cps", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/unemployment_list_join.csv","gs://pdp-feeds-staging/Bureau_new/ln.csv"]', + "SOURCE_FILES": '["files/data1.csv","files/data2.csv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/unemployment_cps/data_output.csv", + "PIPELINE_NAME": "unemployment_cps", + "JOINING_KEY": "series_id", + "TRIM_SPACE": '["series_id","value","footnote_codes","series_title"]', + "CSV_HEADERS": '["series_id","year","period","value","footnote_codes","date","series_title"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/unemployment_cps/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.unemployment_cps", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date", "type": "DATE", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/unemployment_cps_series/pipeline.yaml b/datasets/bls/unemployment_cps_series/pipeline.yaml new file mode 100644 index 000000000..c75569bd0 --- /dev/null +++ b/datasets/bls/unemployment_cps_series/pipeline.yaml @@ -0,0 +1,209 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: unemployment_cps_series + description: "Unemployment_CPS_Series Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "unemployment_cps_series" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "unemployment_cps_series" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/ln.series.tsv"] + SOURCE_FILES: >- + ["files/data1.tsv"] + TARGET_FILE: "files/data_output.tsv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/unemployment_cps_series/data_output.csv" + PIPELINE_NAME: "unemployment_cps_series" + JOINING_KEY: "" + TRIM_SPACE: >- + ["series_id","footnote_codes","series_title"] + CSV_HEADERS: >- + ["series_id","lfst_code","periodicity_code","series_title","absn_code","activity_code","ages_code","class_code","duration_code","education_code","entr_code","expr_code","hheader_code","hour_code","indy_code","jdes_code","look_code","mari_code","mjhs_code","occupation_code","orig_code","pcts_code","race_code","rjnw_code","rnlf_code","rwns_code","seek_code","sexs_code","tdat_code","vets_code","wkst_code","born_code","chld_code","disa_code","seasonal","footnote_codes","begin_year","begin_period","end_year","end_period","cert_code"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/unemployment_cps_series/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.unemployment_cps_series" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "lfst_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "periodicity_code" + type: "STRING" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + - name: "absn_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "activity_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "ages_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "class_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "duration_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "education_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "entr_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "expr_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "hheader_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "hour_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "indy_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "jdes_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "look_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "mari_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "mjhs_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "occupation_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "orig_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "pcts_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "race_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "rjnw_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "rnlf_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "rwns_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "seek_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "sexs_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "tdat_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "vets_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "wkst_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "born_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "chld_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "disa_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "seasonal" + type: "STRING" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "begin_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "begin_period" + type: "STRING" + mode: "NULLABLE" + - name: "end_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "end_period" + type: "STRING" + mode: "NULLABLE" + - name: "cert_code" + type: "INTEGER" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/unemployment_cps_series/unemployment_cps_series_dag.py b/datasets/bls/unemployment_cps_series/unemployment_cps_series_dag.py new file mode 100644 index 000000000..2485062c3 --- /dev/null +++ b/datasets/bls/unemployment_cps_series/unemployment_cps_series_dag.py @@ -0,0 +1,129 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.unemployment_cps_series", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="unemployment_cps_series", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/ln.series.tsv"]', + "SOURCE_FILES": '["files/data1.tsv"]', + "TARGET_FILE": "files/data_output.tsv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/unemployment_cps_series/data_output.csv", + "PIPELINE_NAME": "unemployment_cps_series", + "JOINING_KEY": "", + "TRIM_SPACE": '["series_id","footnote_codes","series_title"]', + "CSV_HEADERS": '["series_id","lfst_code","periodicity_code","series_title","absn_code","activity_code","ages_code","class_code","duration_code","education_code","entr_code","expr_code","hheader_code","hour_code","indy_code","jdes_code","look_code","mari_code","mjhs_code","occupation_code","orig_code","pcts_code","race_code","rjnw_code","rnlf_code","rwns_code","seek_code","sexs_code","tdat_code","vets_code","wkst_code","born_code","chld_code","disa_code","seasonal","footnote_codes","begin_year","begin_period","end_year","end_period","cert_code"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/unemployment_cps_series/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.unemployment_cps_series", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "lfst_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "periodicity_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + {"name": "absn_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "activity_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "ages_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "class_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "duration_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "education_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "entr_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "expr_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "hheader_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "hour_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "indy_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "jdes_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "look_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "mari_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "mjhs_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "occupation_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "orig_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "pcts_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "race_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "rjnw_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "rnlf_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "rwns_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "seek_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "sexs_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "tdat_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "vets_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "wkst_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "born_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "chld_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "disa_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "seasonal", "type": "STRING", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "begin_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "begin_period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "end_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "end_period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "cert_code", "type": "INTEGER", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/wm/pipeline.yaml b/datasets/bls/wm/pipeline.yaml new file mode 100644 index 000000000..8b143e97e --- /dev/null +++ b/datasets/bls/wm/pipeline.yaml @@ -0,0 +1,108 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: wm + description: "WM Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "wm" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "wm" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/wages_list_join.csv","gs://pdp-feeds-staging/Bureau_new/wm.csv"] + SOURCE_FILES: >- + ["files/data1.csv","files/data2.csv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/wm/data_output.csv" + PIPELINE_NAME: "wm" + JOINING_KEY: "series_id" + TRIM_SPACE: >- + ["series_id","value","footnote_codes","series_title"] + CSV_HEADERS: >- + ["series_id","year","period","value","footnote_codes","date","series_title"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/wm/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.wm" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "year" + type: "INTEGER" + mode: "NULLABLE" + - name: "period" + type: "STRING" + mode: "NULLABLE" + - name: "value" + type: "FLOAT" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "date" + type: "DATE" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/wm/wm_dag.py b/datasets/bls/wm/wm_dag.py new file mode 100644 index 000000000..cbf70b632 --- /dev/null +++ b/datasets/bls/wm/wm_dag.py @@ -0,0 +1,95 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.wm", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="wm", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/wages_list_join.csv","gs://pdp-feeds-staging/Bureau_new/wm.csv"]', + "SOURCE_FILES": '["files/data1.csv","files/data2.csv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/wm/data_output.csv", + "PIPELINE_NAME": "wm", + "JOINING_KEY": "series_id", + "TRIM_SPACE": '["series_id","value","footnote_codes","series_title"]', + "CSV_HEADERS": '["series_id","year","period","value","footnote_codes","date","series_title"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/wm/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.wm", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "value", "type": "FLOAT", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "date", "type": "DATE", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/bls/wm_series/pipeline.yaml b/datasets/bls/wm_series/pipeline.yaml new file mode 100644 index 000000000..091f9857d --- /dev/null +++ b/datasets/bls/wm_series/pipeline.yaml @@ -0,0 +1,135 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: wm_series + description: "WM_Series Dataset" + +dag: + airflow_version: 1 + initialize: + dag_id: "wm_series" + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + args: + task_id: "transform_csv" + startup_timeout_seconds: 600 + name: "wm_series" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.bls.container_registry.run_csv_transform_kub }}" + env_vars: + SOURCE_URLS: >- + ["gs://pdp-feeds-staging/Bureau/wm.series.tsv"] + SOURCE_FILES: >- + ["files/data1.tsv"] + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/bls/wm_series/data_output.csv" + PIPELINE_NAME: "wm_series" + JOINING_KEY: "" + TRIM_SPACE: >- + ["series_id","footnote_codes","series_title"] + CSV_HEADERS: >- + ["series_id","seasonal","area_code","ownership_code","estimate_code","industry_code","occupation_code","subcell_code","datatype_code","level_code","series_title","footnote_codes","begin_year","begin_period","end_year","end_period"] + resources: + request_memory: "4G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/bls/wm_series/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "bls.wm_series" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + + schema_fields: + - name: "series_id" + type: "STRING" + mode: "required" + - name: "seasonal" + type: "STRING" + mode: "NULLABLE" + - name: "area_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "ownership_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "estimate_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "industry_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "occupation_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "subcell_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "datatype_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "level_code" + type: "INTEGER" + mode: "NULLABLE" + - name: "series_title" + type: "STRING" + mode: "NULLABLE" + - name: "footnote_codes" + type: "STRING" + mode: "NULLABLE" + - name: "begin_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "begin_period" + type: "STRING" + mode: "NULLABLE" + - name: "end_year" + type: "INTEGER" + mode: "NULLABLE" + - name: "end_period" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/bls/wm_series/wm_series_dag.py b/datasets/bls/wm_series/wm_series_dag.py new file mode 100644 index 000000000..7b630fac4 --- /dev/null +++ b/datasets/bls/wm_series/wm_series_dag.py @@ -0,0 +1,104 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="bls.wm_series", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="transform_csv", + startup_timeout_seconds=600, + name="wm_series", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.bls.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URLS": '["gs://pdp-feeds-staging/Bureau/wm.series.tsv"]', + "SOURCE_FILES": '["files/data1.tsv"]', + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/bls/wm_series/data_output.csv", + "PIPELINE_NAME": "wm_series", + "JOINING_KEY": "", + "TRIM_SPACE": '["series_id","footnote_codes","series_title"]', + "CSV_HEADERS": '["series_id","seasonal","area_code","ownership_code","estimate_code","industry_code","occupation_code","subcell_code","datatype_code","level_code","series_title","footnote_codes","begin_year","begin_period","end_year","end_period"]', + }, + resources={"request_memory": "4G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/bls/wm_series/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="bls.wm_series", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "series_id", "type": "STRING", "mode": "required"}, + {"name": "seasonal", "type": "STRING", "mode": "NULLABLE"}, + {"name": "area_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "ownership_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "estimate_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "industry_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "occupation_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "subcell_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "datatype_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "level_code", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "series_title", "type": "STRING", "mode": "NULLABLE"}, + {"name": "footnote_codes", "type": "STRING", "mode": "NULLABLE"}, + {"name": "begin_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "begin_period", "type": "STRING", "mode": "NULLABLE"}, + {"name": "end_year", "type": "INTEGER", "mode": "NULLABLE"}, + {"name": "end_period", "type": "STRING", "mode": "NULLABLE"}, + ], + ) + + transform_csv >> load_to_bq