From f0ced963ddeeb58313d3e5c777ea6383bb163863 Mon Sep 17 00:00:00 2001 From: Nicholas Large <84149918+nlarge-google@users.noreply.github.com> Date: Wed, 8 Dec 2021 09:56:02 -0600 Subject: [PATCH] feat: Onboard FDA food dataset (#223) --- .../_images/run_csv_transform_kub/Dockerfile | 21 + .../run_csv_transform_kub/csv_transform.py | 387 ++++++++++++++++++ .../run_csv_transform_kub/requirements.txt | 3 + .../fda_food/_terraform/fda_food_dataset.tf | 26 ++ .../_terraform/food_enforcement_pipeline.tf | 39 ++ .../_terraform/food_events_pipeline.tf | 39 ++ datasets/fda_food/_terraform/provider.tf | 28 ++ datasets/fda_food/_terraform/variables.tf | 23 ++ datasets/fda_food/dataset.yaml | 27 ++ .../food_enforcement/food_enforcement_dag.py | 236 +++++++++++ .../fda_food/food_enforcement/pipeline.yaml | 206 ++++++++++ .../fda_food/food_events/food_events_dag.py | 166 ++++++++ datasets/fda_food/food_events/pipeline.yaml | 155 +++++++ 13 files changed, 1356 insertions(+) create mode 100644 datasets/fda_food/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/fda_food/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/fda_food/_terraform/fda_food_dataset.tf create mode 100644 datasets/fda_food/_terraform/food_enforcement_pipeline.tf create mode 100644 datasets/fda_food/_terraform/food_events_pipeline.tf create mode 100644 datasets/fda_food/_terraform/provider.tf create mode 100644 datasets/fda_food/_terraform/variables.tf create mode 100644 datasets/fda_food/dataset.yaml create mode 100644 datasets/fda_food/food_enforcement/food_enforcement_dag.py create mode 100644 datasets/fda_food/food_enforcement/pipeline.yaml create mode 100644 datasets/fda_food/food_events/food_events_dag.py create mode 100644 datasets/fda_food/food_events/pipeline.yaml diff --git a/datasets/fda_food/_images/run_csv_transform_kub/Dockerfile b/datasets/fda_food/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..748bc3bec --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./csv_transform.py . +CMD ["python3", "csv_transform.py"] diff --git a/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..0fee34bb4 --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,387 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import os +import pathlib +import typing +import zipfile as zip + +import numpy as np +import pandas as pd +import requests +from google.cloud import storage + + +def main( + pipeline: str, + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + data_names: typing.List[str], + data_dtypes: dict, + rename_mappings: dict, + reorder_headers_list: typing.List[str], + record_path: str, + meta: typing.List[str], +) -> None: + + logging.info("Food and Drug Administration (FDA) - Food Events process started") + + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + dest_path = os.path.split(source_file)[0] + source_zip_file = dest_path + "/" + os.path.split(source_url)[1] + source_json_file = source_zip_file.replace(".zip", "") + + download_file_http(source_url, source_zip_file, False) + unpack_file(source_zip_file, dest_path, "zip") + convert_json_to_csv( + source_json_file, source_file, record_path=record_path, meta=meta, separator="|" + ) + + process_source_file( + pipeline, + source_file, + target_file, + data_names, + data_dtypes, + int(chunksize), + rename_mappings, + reorder_headers_list, + ) + + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("Food and Drug Administration (FDA) - Food Events process completed") + + +def process_source_file( + pipeline: str, + source_file: str, + target_file: str, + names: list, + dtypes: dict, + chunksize: int, + rename_mappings: dict, + reorder_headers_list: list, +) -> None: + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, # path to main source file to load in batches + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + chunksize=chunksize, # size of batch data, in no. of records + sep="|", # data column separator, typically "," + header=None, # use when the data file does not contain a header + names=names, + skiprows=1, + dtype=dtypes, + keep_default_na=True, + na_values=[" "], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + rename_mappings=rename_mappings, + reorder_headers_list=reorder_headers_list, + pipeline=pipeline, + skip_header=(not chunk_number == 0), + ) + + +def process_chunk( + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + rename_mappings: dict, + reorder_headers_list: list, + pipeline: str, + skip_header: bool = False, +) -> None: + if pipeline == "food events": + df = process_food_events(df, rename_mappings, reorder_headers_list) + elif pipeline == "food enforcement": + df = process_food_enforcement(df, reorder_headers_list) + else: + logging.info("pipeline was not specified") + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + + +def process_food_events( + df: pd.DataFrame, rename_mappings: dict, reorder_headers_list: list +) -> None: + df = rename_headers(df, rename_mappings) + df = reorder_headers(df, reorder_headers_list) + list_data = ["reactions", "outcomes"] + df = format_list_data(df, list_data) + df = replace_nan_data(df) + df = trim_whitespace(df) + col_list = [ + "date_started", + "date_created", + "products_industry_name", + "products_industry_code", + ] + df = replace_nulls(df, col_list) + date_col_list = ["date_started", "date_created"] + df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) + + return df + + +def process_food_enforcement(df: pd.DataFrame, reorder_headers_list: list) -> None: + df = trim_whitespace(df) + date_col_list = [ + "center_classification_date", + "report_date", + "termination_date", + "recall_initiation_date", + ] + df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) + df = reorder_headers(df, reorder_headers_list) + + return df + + +def replace_nan_data(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Replacing NaN data") + df = df.replace(np.nan, "", regex=True) + + return df + + +def format_list_data(df: pd.DataFrame, list_data: list) -> pd.DataFrame: + logging.info("Formatting list data") + for col in list_data: + df[col] = df[col].apply( + lambda x: str(x).replace("[", "").replace("]", "").replace("'", "") + ) + + return df + + +def replace_nulls(df: pd.DataFrame, col_list: list) -> pd.DataFrame: + logging.info("Resolving null text in source data") + for col in col_list: + df[col] = df[col].apply(lambda x: "" if str(x).lower() == "null" else x) + + return df + + +def resolve_date_format( + df: pd.DataFrame, + date_col_list: list, + from_format: str, + to_format: str = "%Y-%m-%d %H:%M:%S", + is_date: bool = False, +) -> pd.DataFrame: + logging.info("Resolving Date Format") + for col in date_col_list: + logging.info(f"Resolving datetime on {col}") + df[col] = df[col].apply( + lambda x: convert_dt_format(str(x), from_format, to_format, is_date) + ) + + return df + + +def convert_dt_format( + dt_str: str, from_format: str, to_format: str, is_date: bool +) -> str: + rtnval = "" + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + rtnval = "" + elif len(dt_str.strip()) == 10: + # if there is no time format + rtnval = dt_str + " 00:00:00" + elif ( + is_date + ): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): + # if there is only a date in YYYYMMDD format then add dashes + rtnval = ( + dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] + ) + elif len(dt_str.strip().split(" ")[1]) == 8: + # if format of time portion is 00:00:00 then use 00:00 format + dt_str = dt_str[:-3] + rtnval = datetime.datetime.strptime(dt_str, from_format).strftime(to_format) + elif (len(dt_str.strip().split("-")[0]) == 4) and ( + len(from_format.strip().split("/")[0]) == 2 + ): + # if the format of the date portion of the data is in YYYY-MM-DD format + # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format + # to use the YYYY-MM-DD. This resolves mixed date formats in files + from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] + else: + dt_str = "" + + return rtnval + + +def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Trimming whitespace") + for col in df.columns: + col_dtype = df[col].dtype + if col_dtype == "object": + logging.info(f"Trimming whitespace on {col}") + df[col] = df[col].apply(lambda x: str(x).strip()) + + return df + + +def save_to_new_file(df, file_path) -> None: + df.to_csv(file_path, index=False) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def download_file_http( + source_url: str, source_file: pathlib.Path, continue_on_error: bool = False +) -> None: + logging.info(f"Downloading {source_url} to {source_file}") + try: + src_file = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in src_file: + f.write(chunk) + except requests.exceptions.RequestException as e: + if e == requests.exceptions.HTTPError: + err_msg = "A HTTP error occurred." + elif e == requests.exceptions.Timeout: + err_msg = "A HTTP timeout error occurred." + elif e == requests.exceptions.TooManyRedirects: + err_msg = "Too Many Redirects occurred." + if not continue_on_error: + logging.info(f"{err_msg} Unable to obtain {source_url}") + raise SystemExit(e) + else: + logging.info( + f"{err_msg} Unable to obtain {source_url}. Continuing execution." + ) + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df = df.rename(columns=rename_mappings) + + return df + + +def reorder_headers(df: pd.DataFrame, reorder_headers_list: list) -> pd.DataFrame: + logging.info("Re-ordering Headers") + df = df.reindex(columns=reorder_headers_list) + + return df + + +def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: + if os.path.exists(infile): + if compression_type == "zip": + logging.info(f"Unpacking {infile} to {dest_path}") + with zip.ZipFile(infile, mode="r") as zipf: + zipf.extractall(dest_path) + zipf.close() + else: + logging.info( + f"{infile} ignored as it is not compressed or is of unknown compression" + ) + else: + logging.info(f"{infile} not unpacked because it does not exist.") + + +def convert_json_to_csv( + source_file_json: str, + source_file_csv: str, + record_path: str, + meta: list, + separator: str = "|", +) -> None: + logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") + f = open( + source_file_json.strip(), + ) + json_data = json.load(f) + if record_path: + df = pd.json_normalize( + json_data["results"], + record_path=[record_path], + meta=meta, + errors="ignore", + ) + else: + df = pd.DataFrame(json_data["results"]) + for col in df.columns: + df[col] = df[col].fillna("") + df.to_csv( + source_file_csv, index=False, sep=separator, quotechar='"', encoding="utf-8" + ) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + pipeline=os.environ["PIPELINE"], + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + data_names=json.loads(os.environ["DATA_NAMES"]), + data_dtypes=json.loads(os.environ["DATA_DTYPES"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + reorder_headers_list=json.loads(os.environ["REORDER_HEADERS"]), + record_path=os.environ["RECORD_PATH"], + meta=json.loads(os.environ["META"]), + ) diff --git a/datasets/fda_food/_images/run_csv_transform_kub/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/fda_food/_terraform/fda_food_dataset.tf b/datasets/fda_food/_terraform/fda_food_dataset.tf new file mode 100644 index 000000000..4337fa531 --- /dev/null +++ b/datasets/fda_food/_terraform/fda_food_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "fda_food" { + dataset_id = "fda_food" + project = var.project_id + description = "fda_food" +} + +output "bigquery_dataset-fda_food-dataset_id" { + value = google_bigquery_dataset.fda_food.dataset_id +} diff --git a/datasets/fda_food/_terraform/food_enforcement_pipeline.tf b/datasets/fda_food/_terraform/food_enforcement_pipeline.tf new file mode 100644 index 000000000..17a09a44d --- /dev/null +++ b/datasets/fda_food/_terraform/food_enforcement_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "fda_food_food_enforcement" { + project = var.project_id + dataset_id = "fda_food" + table_id = "food_enforcement" + + description = "fda_foodspc" + + + + + depends_on = [ + google_bigquery_dataset.fda_food + ] +} + +output "bigquery_table-fda_food_food_enforcement-table_id" { + value = google_bigquery_table.fda_food_food_enforcement.table_id +} + +output "bigquery_table-fda_food_food_enforcement-id" { + value = google_bigquery_table.fda_food_food_enforcement.id +} diff --git a/datasets/fda_food/_terraform/food_events_pipeline.tf b/datasets/fda_food/_terraform/food_events_pipeline.tf new file mode 100644 index 000000000..ffdd9062f --- /dev/null +++ b/datasets/fda_food/_terraform/food_events_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "fda_food_food_events" { + project = var.project_id + dataset_id = "fda_food" + table_id = "food_events" + + description = "fda_foodspc" + + + + + depends_on = [ + google_bigquery_dataset.fda_food + ] +} + +output "bigquery_table-fda_food_food_events-table_id" { + value = google_bigquery_table.fda_food_food_events.table_id +} + +output "bigquery_table-fda_food_food_events-id" { + value = google_bigquery_table.fda_food_food_events.id +} diff --git a/datasets/fda_food/_terraform/provider.tf b/datasets/fda_food/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/fda_food/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/fda_food/_terraform/variables.tf b/datasets/fda_food/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/fda_food/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/fda_food/dataset.yaml b/datasets/fda_food/dataset.yaml new file mode 100644 index 000000000..e550f4031 --- /dev/null +++ b/datasets/fda_food/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: fda_food + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + + - type: bigquery_dataset + dataset_id: fda_food + description: fda_food diff --git a/datasets/fda_food/food_enforcement/food_enforcement_dag.py b/datasets/fda_food/food_enforcement/food_enforcement_dag.py new file mode 100644 index 000000000..7f7deb994 --- /dev/null +++ b/datasets/fda_food/food_enforcement/food_enforcement_dag.py @@ -0,0 +1,236 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="fda_food.food_enforcement", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="food_enforcement", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE": "food enforcement", + "SOURCE_URL": "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/fda_food/food_enforcement/files/data_output.csv", + "DATA_NAMES": '[ "status", "city", "state", "country", "classification",\n "openfda", "product_type", "event_id", "recalling_firm", "address_1",\n "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",\n "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",\n "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]', + "DATA_DTYPES": '{ "status": "str", "city": "str", "state": "str", "country": "str", "classification": "str",\n "openfda": "str", "product_type": "str", "event_id": "str", "recalling_firm": "str", "address_1": "str",\n "address_2": "str", "postal_code": "str", "voluntary_mandated": "str", "initial_firm_notification": "str", "distribution_pattern": "str",\n "recall_number": "str", "product_description": "str", "product_quantity": "str", "reason_for_recall": "str", "recall_initiation_date": "str",\n "center_classification_date": "str", "report_date": "str", "code_info": "str", "more_code_info": "str", "termination_date": "str" }', + "RENAME_MAPPINGS": "{ }", + "REORDER_HEADERS": '[ "classification", "center_classification_date", "report_date", "postal_code", "termination_date",\n "recall_initiation_date", "recall_number", "city", "event_id", "distribution_pattern",\n "recalling_firm", "voluntary_mandated", "state", "reason_for_recall", "initial_firm_notification",\n "status", "product_type", "country", "product_description", "code_info",\n "address_1", "address_2", "product_quantity", "more_code_info" ]', + "RECORD_PATH": "", + "META": '[ "status", "city", "state", "country", "classification",\n "openfda", "product_type", "event_id", "recalling_firm", "address_1",\n "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",\n "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",\n "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]', + }, + resources={"limit_memory": "4G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/fda_food/food_enforcement/files/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="{{ var.json.fda_food.container_registry.food_enforcement_destination_table }}", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "classification", + "type": "STRING", + "description": "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food.", + "mode": "NULLABLE", + }, + { + "name": "center_classification_date", + "type": "DATE", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "report_date", + "type": "DATE", + "description": "Date that the FDA issued the enforcement report for the product recall.", + "mode": "NULLABLE", + }, + { + "name": "postal_code", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "termination_date", + "type": "DATE", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "recall_initiation_date", + "type": "DATE", + "description": "Date that the firm first began notifying the public or their consignees of the recall.", + "mode": "NULLABLE", + }, + { + "name": "recall_number", + "type": "STRING", + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes.", + "mode": "NULLABLE", + }, + { + "name": "city", + "type": "STRING", + "description": "The city in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "event_id", + "type": "INTEGER", + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes.", + "mode": "NULLABLE", + }, + { + "name": "distribution_pattern", + "type": "STRING", + "description": "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included.", + "mode": "NULLABLE", + }, + { + "name": "recalling_firm", + "type": "STRING", + "description": "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled.", + "mode": "NULLABLE", + }, + { + "name": "voluntary_mandated", + "type": "STRING", + "description": "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA).", + "mode": "NULLABLE", + }, + { + "name": "state", + "type": "STRING", + "description": "The U.S. state in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "reason_for_recall", + "type": "STRING", + "description": "Information describing how the product is defective and violates the FD&C Act or related statutes.", + "mode": "NULLABLE", + }, + { + "name": "initial_firm_notification", + "type": "STRING", + "description": "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered.", + "mode": "NULLABLE", + }, + { + "name": "status", + "type": "STRING", + "description": "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified.", + "mode": "NULLABLE", + }, + { + "name": "product_type", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "country", + "type": "STRING", + "description": "The country in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "product_description", + "type": "STRING", + "description": "Brief description of the product being recalled.", + "mode": "NULLABLE", + }, + { + "name": "code_info", + "type": "STRING", + "description": "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling.", + "mode": "NULLABLE", + }, + { + "name": "address_1", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "address_2", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "product_quantity", + "type": "STRING", + "description": "The amount of defective product subject to recall.", + "mode": "NULLABLE", + }, + { + "name": "more_code_info", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/fda_food/food_enforcement/pipeline.yaml b/datasets/fda_food/food_enforcement/pipeline.yaml new file mode 100644 index 000000000..5c3580399 --- /dev/null +++ b/datasets/fda_food/food_enforcement/pipeline.yaml @@ -0,0 +1,206 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "food_enforcement" + description: "fda_foodspc" + +dag: + airflow_version: 2 + initialize: + dag_id: food_enforcement + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "food_enforcement" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE: 'food enforcement' + SOURCE_URL: "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/fda_food/food_enforcement/files/data_output.csv" + DATA_NAMES: >- + [ "status", "city", "state", "country", "classification", + "openfda", "product_type", "event_id", "recalling_firm", "address_1", + "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern", + "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date", + "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ] + DATA_DTYPES: >- + { "status": "str", "city": "str", "state": "str", "country": "str", "classification": "str", + "openfda": "str", "product_type": "str", "event_id": "str", "recalling_firm": "str", "address_1": "str", + "address_2": "str", "postal_code": "str", "voluntary_mandated": "str", "initial_firm_notification": "str", "distribution_pattern": "str", + "recall_number": "str", "product_description": "str", "product_quantity": "str", "reason_for_recall": "str", "recall_initiation_date": "str", + "center_classification_date": "str", "report_date": "str", "code_info": "str", "more_code_info": "str", "termination_date": "str" } + RENAME_MAPPINGS: >- + { } + REORDER_HEADERS: >- + [ "classification", "center_classification_date", "report_date", "postal_code", "termination_date", + "recall_initiation_date", "recall_number", "city", "event_id", "distribution_pattern", + "recalling_firm", "voluntary_mandated", "state", "reason_for_recall", "initial_firm_notification", + "status", "product_type", "country", "product_description", "code_info", + "address_1", "address_2", "product_quantity", "more_code_info" ] + RECORD_PATH: "" + META: >- + [ "status", "city", "state", "country", "classification", + "openfda", "product_type", "event_id", "recalling_firm", "address_1", + "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern", + "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date", + "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ] + resources: + limit_memory: "4G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/fda_food/food_enforcement/files/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "{{ var.json.fda_food.container_registry.food_enforcement_destination_table }}" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "classification" + "type": "STRING" + "description": "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food." + "mode": "NULLABLE" + - "name": "center_classification_date" + "type": "DATE" + "description": "" + "mode": "NULLABLE" + - "name": "report_date" + "type": "DATE" + "description": "Date that the FDA issued the enforcement report for the product recall." + "mode": "NULLABLE" + - "name": "postal_code" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "termination_date" + "type": "DATE" + "description": "" + "mode": "NULLABLE" + - "name": "recall_initiation_date" + "type": "DATE" + "description": "Date that the firm first began notifying the public or their consignees of the recall." + "mode": "NULLABLE" + - "name": "recall_number" + "type": "STRING" + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode": "NULLABLE" + - "name": "city" + "type": "STRING" + "description": "The city in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "event_id" + "type": "INTEGER" + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode": "NULLABLE" + - "name": "distribution_pattern" + "type": "STRING" + "description": "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included." + "mode": "NULLABLE" + - "name": "recalling_firm" + "type": "STRING" + "description": "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled." + "mode": "NULLABLE" + - "name": "voluntary_mandated" + "type": "STRING" + "description": "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA)." + "mode": "NULLABLE" + - "name": "state" + "type": "STRING" + "description": "The U.S. state in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "reason_for_recall" + "type": "STRING" + "description": "Information describing how the product is defective and violates the FD&C Act or related statutes." + "mode": "NULLABLE" + - "name": "initial_firm_notification" + "type": "STRING" + "description": "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered." + "mode": "NULLABLE" + - "name": "status" + "type": "STRING" + "description": "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified." + "mode": "NULLABLE" + - "name": "product_type" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "country" + "type": "STRING" + "description": "The country in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "product_description" + "type": "STRING" + "description": "Brief description of the product being recalled." + "mode": "NULLABLE" + - "name": "code_info" + "type": "STRING" + "description": "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling." + "mode": "NULLABLE" + - "name": "address_1" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "address_2" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "product_quantity" + "type": "STRING" + "description": "The amount of defective product subject to recall." + "mode": "NULLABLE" + - "name": "more_code_info" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" diff --git a/datasets/fda_food/food_events/food_events_dag.py b/datasets/fda_food/food_events/food_events_dag.py new file mode 100644 index 000000000..f0058319c --- /dev/null +++ b/datasets/fda_food/food_events/food_events_dag.py @@ -0,0 +1,166 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="fda_food.food_events", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="food_events", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}", + env_vars={ + "PIPELINE": "food events", + "SOURCE_URL": "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/fda_food/food_events/files/data_output.csv", + "DATA_NAMES": '[ "role", "name_brand", "industry_code", "industry_name", "report_number",\n "outcomes", "date_created", "reactions", "date_started", "consumer.age",\n "consumer.age_unit", "consumer.gender" ]', + "DATA_DTYPES": '{ "role": "str", "name_brand": "str", "industry_code": "str", "industry_name": "str", "report_number": "str",\n "outcomes": "str", "date_created": "str", "reactions": "str", "date_started": "str", "consumer.age": "float64",\n "consumer.age_unit": "str", "consumer.gender": "str" }', + "RENAME_MAPPINGS": '{ "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code",\n "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender",\n "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" }', + "REORDER_HEADERS": '[ "report_number", "reactions", "outcomes", "products_brand_name", "products_industry_code",\n "products_role", "products_industry_name", "date_created", "date_started", "consumer_gender",\n "consumer_age", "consumer_age_unit" ]', + "RECORD_PATH": "products", + "META": '[\n "report_number", "outcomes", "date_created", "reactions", "date_started",\n ["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"]\n]', + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/fda_food/food_events/files/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="{{ var.json.fda_food.container_registry.food_events_destination_table }}", + skip_leading_rows=1, + allow_quoted_newlines=True, + field_delimiter=",", + quote_character='"', + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "report_number", + "type": "STRING", + "description": "The report number", + "mode": "NULLABLE", + }, + { + "name": "reactions", + "type": "STRING", + "description": "Information on the reactions or symptoms experienced by the individual involved", + "mode": "NULLABLE", + }, + { + "name": "outcomes", + "type": "STRING", + "description": "Information on known outcomes or consequences of the adverse event. For more info, refer: https://open.fda.gov/food/event/reference/", + "mode": "NULLABLE", + }, + { + "name": "products_brand_name", + "type": "STRING", + "description": "The reported brand name of the product.", + "mode": "NULLABLE", + }, + { + "name": "products_industry_code", + "type": "STRING", + "description": "The FDA industry code for the product. Results in this endpoint are generally limited to products tagged with industry codes related to human food and nutritional supplements or cosmetics. For more info, refer: https://open.fda.gov/food/event/reference/", + "mode": "NULLABLE", + }, + { + "name": "products_role", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "products_industry_name", + "type": "STRING", + "description": "The FDA industry name associated with the product.", + "mode": "NULLABLE", + }, + { + "name": "date_created", + "type": "DATE", + "description": "Date the report was received by FDA.", + "mode": "NULLABLE", + }, + { + "name": "date_started", + "type": "DATE", + "description": "Date of the adverse event (when it was considered to have started).", + "mode": "NULLABLE", + }, + { + "name": "consumer_gender", + "type": "STRING", + "description": "The reported gender of the consumer. Female = Female Male = Male Not Available = Unknown", + "mode": "NULLABLE", + }, + { + "name": "consumer_age", + "type": "FLOAT", + "description": "The reported age of the consumer at the time of the adverse event report, expressed in the unit in the field age_unit", + "mode": "NULLABLE", + }, + { + "name": "consumer_age_unit", + "type": "STRING", + "description": "Encodes the unit in which the age of the consumer is expressed. Day(s) = age is expressed in days Week(s) = age is expressed in weeks Month(s) = age is expressed in months Year(s) = age is expressed in years Decade(s) = age is expressed in decades Not Available = Unknown", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/fda_food/food_events/pipeline.yaml b/datasets/fda_food/food_events/pipeline.yaml new file mode 100644 index 000000000..57a5dd8d4 --- /dev/null +++ b/datasets/fda_food/food_events/pipeline.yaml @@ -0,0 +1,155 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "food_events" + description: "fda_foodspc" + +dag: + airflow_version: 2 + initialize: + dag_id: food_events + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "food_events" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}" + env_vars: + PIPELINE: "food events" + SOURCE_URL: "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/fda_food/food_events/files/data_output.csv" + DATA_NAMES: >- + [ "role", "name_brand", "industry_code", "industry_name", "report_number", + "outcomes", "date_created", "reactions", "date_started", "consumer.age", + "consumer.age_unit", "consumer.gender" ] + DATA_DTYPES: >- + { "role": "str", "name_brand": "str", "industry_code": "str", "industry_name": "str", "report_number": "str", + "outcomes": "str", "date_created": "str", "reactions": "str", "date_started": "str", "consumer.age": "float64", + "consumer.age_unit": "str", "consumer.gender": "str" } + RENAME_MAPPINGS: >- + { "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code", + "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender", + "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" } + REORDER_HEADERS: >- + [ "report_number", "reactions", "outcomes", "products_brand_name", "products_industry_code", + "products_role", "products_industry_name", "date_created", "date_started", "consumer_gender", + "consumer_age", "consumer_age_unit" ] + RECORD_PATH: "products" + META: >- + [ + "report_number", "outcomes", "date_created", "reactions", "date_started", + ["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"] + ] + resources: + limit_memory: "8G" + limit_cpu: "3" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/fda_food/food_events/files/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "{{ var.json.fda_food.container_registry.food_events_destination_table }}" + skip_leading_rows: 1 + allow_quoted_newlines: True + field_delimiter: "," + quote_character: "\"" + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "report_number" + "type": "STRING" + "description": "The report number" + "mode": "NULLABLE" + - "name": "reactions" + "type": "STRING" + "description": "Information on the reactions or symptoms experienced by the individual involved" + "mode": "NULLABLE" + - "name": "outcomes" + "type": "STRING" + "description": "Information on known outcomes or consequences of the adverse event. For more info, refer: https://open.fda.gov/food/event/reference/" + "mode": "NULLABLE" + - "name": "products_brand_name" + "type": "STRING" + "description": "The reported brand name of the product." + "mode": "NULLABLE" + - "name": "products_industry_code" + "type": "STRING" + "description": "The FDA industry code for the product. Results in this endpoint are generally limited to products tagged with industry codes related to human food and nutritional supplements or cosmetics. For more info, refer: https://open.fda.gov/food/event/reference/" + "mode": "NULLABLE" + - "name": "products_role" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "products_industry_name" + "type": "STRING" + "description": "The FDA industry name associated with the product." + "mode": "NULLABLE" + - "name": "date_created" + "type": "DATE" + "description": "Date the report was received by FDA." + "mode": "NULLABLE" + - "name": "date_started" + "type": "DATE" + "description": "Date of the adverse event (when it was considered to have started)." + "mode": "NULLABLE" + - "name": "consumer_gender" + "type": "STRING" + "description": "The reported gender of the consumer. Female = Female Male = Male Not Available = Unknown" + "mode": "NULLABLE" + - "name": "consumer_age" + "type": "FLOAT" + "description": "The reported age of the consumer at the time of the adverse event report, expressed in the unit in the field age_unit" + "mode": "NULLABLE" + - "name": "consumer_age_unit" + "type": "STRING" + "description": "Encodes the unit in which the age of the consumer is expressed. Day(s) = age is expressed in days Week(s) = age is expressed in weeks Month(s) = age is expressed in months Year(s) = age is expressed in years Decade(s) = age is expressed in decades Not Available = Unknown" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq"