From 8756a5c768cef6bb11dcc50c38a924a31a719534 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Fri, 29 Oct 2021 22:29:12 +0000 Subject: [PATCH 1/7] feat: Added fda_food.food_enforcement --- .../Dockerfile | 21 ++ .../csv_transform.py | 287 ++++++++++++++++++ .../requirements.txt | 5 + .../fda_food/_terraform/fda_food_dataset.tf | 26 ++ .../_terraform/food_enforcement_pipeline.tf | 39 +++ datasets/fda_food/_terraform/provider.tf | 28 ++ datasets/fda_food/_terraform/variables.tf | 23 ++ datasets/fda_food/dataset.yaml | 27 ++ .../food_enforcement/food_enforcement_dag.py | 231 ++++++++++++++ .../fda_food/food_enforcement/pipeline.yaml | 190 ++++++++++++ 10 files changed, 877 insertions(+) create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt create mode 100644 datasets/fda_food/_terraform/fda_food_dataset.tf create mode 100644 datasets/fda_food/_terraform/food_enforcement_pipeline.tf create mode 100644 datasets/fda_food/_terraform/provider.tf create mode 100644 datasets/fda_food/_terraform/variables.tf create mode 100644 datasets/fda_food/dataset.yaml create mode 100644 datasets/fda_food/food_enforcement/food_enforcement_dag.py create mode 100644 datasets/fda_food/food_enforcement/pipeline.yaml diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile new file mode 100644 index 000000000..748bc3bec --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./csv_transform.py . +CMD ["python3", "csv_transform.py"] diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py new file mode 100644 index 000000000..59ed1b29a --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py @@ -0,0 +1,287 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +# import fnmatch +import json +import logging +import os +import pathlib +import typing +import zipfile as zip + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + data_names: typing.List[str], + data_dtypes: dict +) -> None: + + logging.info("Food and Drug Administration (FDA) - Food Enforcement process started") + + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + # source_file_json = str(source_file).replace(".csv", "") + "_status.json" + dest_path = os.path.split(source_file)[0] + source_zip_file = dest_path + "/" + os.path.split(source_url)[1] + source_json_file = source_zip_file.replace(".zip", "") + # source_csv_file = source_json_file.replace(".json", ".csv") + + # download_file_http(source_url, source_zip_file, False) + # unpack_file(source_zip_file, dest_path, "zip") + # convert_json_to_csv(source_json_file, source_file) + + process_source_file( + source_file, target_file, data_names, data_dtypes, int(chunksize) #, key_list=[] + ) + + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info("Food and Drug Administration (FDA) - Food Enforcement process completed") + +def process_source_file( + source_file: str, + target_file: str, + names: list, + dtypes: dict, + chunksize: int, + # key_list: list, +) -> None: + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, # path to main source file to load in batches + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + chunksize=chunksize, # size of batch data, in no. of records + sep="|", # data column separator, typically "," + header=None, # use when the data file does not contain a header + names=names, + skiprows=1, + dtype=dtypes, + keep_default_na=True, + na_values=[" "] + # parse_dates=["start_date", "end_date"], + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df, target_file_batch, target_file, (not chunk_number == 0) #, key_list + ) + + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool +) -> None: + df = trim_whitespace(df) + date_col_list = ["center_classification_date", "report_date", "termination_date", "recall_initiation_date"] + df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) + df = reorder_headers(df) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + + +def resolve_date_format(df: pd.DataFrame, date_col_list: list, from_format: str, to_format: str="%Y-%m-%d %H:%M:%S", is_date: bool=False) -> pd.DataFrame: + logging.info("Resolving Date Format") + for col in date_col_list: + logging.info(f"Resolving datetime on {col}") + df[col] = df[col].apply(lambda x: convert_dt_format(str(x), from_format, to_format, is_date)) + + return df + + +def convert_dt_format(dt_str: str, from_format: str, to_format: str, is_date: bool) -> str: + rtnval = "" + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + rtnval = "" + elif len(dt_str.strip()) == 10: + # if there is no time format + rtnval = dt_str + " 00:00:00" + elif (is_date): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): + # if there is only a date in YYYYMMDD format then add dashes + rtnval = dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] + elif len(dt_str.strip().split(" ")[1]) == 8: + # if format of time portion is 00:00:00 then use 00:00 format + dt_str = dt_str[:-3] + rtnval = datetime.datetime.strptime(dt_str, from_format).strftime(to_format) + elif (len(dt_str.strip().split("-")[0]) == 4) and ( + len(from_format.strip().split("/")[0]) == 2 + ): + # if the format of the date portion of the data is in YYYY-MM-DD format + # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format + # to use the YYYY-MM-DD. This resolves mixed date formats in files + from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] + else: + dt_str = "" + + # return datetime.datetime.strptime(dt_str, from_format).strftime("%Y-%m-%d %H:%M:%S") + return rtnval + + +def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Trimming whitespace") + for col in df.columns: + col_dtype = df[col].dtype + if col_dtype == "object": + logging.info(f"Trimming whitespace on {col}") + df[col] = df[col].apply(lambda x: str(x).strip()) + + return df + + +def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Re-ordering Headers") + df = df[ + [ + "classification", + "center_classification_date", + "report_date", + "postal_code", + "termination_date", + "recall_initiation_date", + "recall_number", + "city", + "event_id", + "distribution_pattern", + "recalling_firm", + "voluntary_mandated", + "state", + "reason_for_recall", + "initial_firm_notification", + "status", + "product_type", + "country", + "product_description", + "code_info", + "address_1", + "address_2", + "product_quantity", + "more_code_info" + ] + ] + + return df + + +def save_to_new_file(df, file_path) -> None: + df.to_csv(file_path, index=False) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def download_file_http( + source_url: str, source_file: pathlib.Path, continue_on_error: bool = False +) -> None: + logging.info(f"Downloading {source_url} to {source_file}") + try: + src_file = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in src_file: + f.write(chunk) + except requests.exceptions.RequestException as e: + if e == requests.exceptions.HTTPError: + err_msg = "A HTTP error occurred." + elif e == requests.exceptions.Timeout: + err_msg = "A HTTP timeout error occurred." + elif e == requests.exceptions.TooManyRedirects: + err_msg = "Too Many Redirects occurred." + if not continue_on_error: + logging.info(f"{err_msg} Unable to obtain {source_url}") + raise SystemExit(e) + else: + logging.info( + f"{err_msg} Unable to obtain {source_url}. Continuing execution." + ) + + +def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: + if os.path.exists(infile): + if compression_type == "zip": + logging.info(f"Unpacking {infile} to {dest_path}") + with zip.ZipFile(infile, mode="r") as zipf: + zipf.extractall(dest_path) + zipf.close() + else: + logging.info( + f"{infile} ignored as it is not compressed or is of unknown compression" + ) + else: + logging.info(f"{infile} not unpacked because it does not exist.") + + +def convert_json_to_csv( + source_file_json: str, source_file_csv: str +) -> None: + logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") + f = open( + source_file_json.strip(), + ) + json_data = json.load(f) + df = pd.DataFrame(json_data["results"]) + df.to_csv(source_file_csv, index=False, sep="|", quotechar='"', encoding="utf-8") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + data_names=json.loads(os.environ["DATA_NAMES"]), + data_dtypes=json.loads(os.environ["DATA_DTYPES"]), + ) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt new file mode 100644 index 000000000..88bfd2aba --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt @@ -0,0 +1,5 @@ +requests +numpy +pandas +google-cloud-storage +gsutil diff --git a/datasets/fda_food/_terraform/fda_food_dataset.tf b/datasets/fda_food/_terraform/fda_food_dataset.tf new file mode 100644 index 000000000..4337fa531 --- /dev/null +++ b/datasets/fda_food/_terraform/fda_food_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "fda_food" { + dataset_id = "fda_food" + project = var.project_id + description = "fda_food" +} + +output "bigquery_dataset-fda_food-dataset_id" { + value = google_bigquery_dataset.fda_food.dataset_id +} diff --git a/datasets/fda_food/_terraform/food_enforcement_pipeline.tf b/datasets/fda_food/_terraform/food_enforcement_pipeline.tf new file mode 100644 index 000000000..17a09a44d --- /dev/null +++ b/datasets/fda_food/_terraform/food_enforcement_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "fda_food_food_enforcement" { + project = var.project_id + dataset_id = "fda_food" + table_id = "food_enforcement" + + description = "fda_foodspc" + + + + + depends_on = [ + google_bigquery_dataset.fda_food + ] +} + +output "bigquery_table-fda_food_food_enforcement-table_id" { + value = google_bigquery_table.fda_food_food_enforcement.table_id +} + +output "bigquery_table-fda_food_food_enforcement-id" { + value = google_bigquery_table.fda_food_food_enforcement.id +} diff --git a/datasets/fda_food/_terraform/provider.tf b/datasets/fda_food/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/fda_food/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/fda_food/_terraform/variables.tf b/datasets/fda_food/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/fda_food/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/fda_food/dataset.yaml b/datasets/fda_food/dataset.yaml new file mode 100644 index 000000000..e550f4031 --- /dev/null +++ b/datasets/fda_food/dataset.yaml @@ -0,0 +1,27 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + name: fda_food + friendly_name: ~ + description: ~ + dataset_sources: ~ + terms_of_use: ~ + + +resources: + + - type: bigquery_dataset + dataset_id: fda_food + description: fda_food diff --git a/datasets/fda_food/food_enforcement/food_enforcement_dag.py b/datasets/fda_food/food_enforcement/food_enforcement_dag.py new file mode 100644 index 000000000..8be0c92fe --- /dev/null +++ b/datasets/fda_food/food_enforcement/food_enforcement_dag.py @@ -0,0 +1,231 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="fda_food.food_enforcement", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="food_enforcement", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}", + env_vars={ + "SOURCE_URL": "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "2500000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/fda_food/food_enforcement/data_output.csv", + "DATA_NAMES": '[ "country", "city", "address_1", "reason_for_recall", "address_2",\n "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state",\n "product_description", "report_date", "classification", "openfda", "recalling_firm",\n "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date",\n "recall_initiation_date", "postal_code", "voluntary_mandated", "status", "more_code_info" ]', + "DATA_DTYPES": '{ "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str",\n "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str",\n "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str",\n "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str",\n "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status" : "str", "more_code_info": "str" }', + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/fda_food/food_enforcement/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="{{ var.value.container_registry.food_enforcement_destination_table }}", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "classification", + "type": "STRING", + "description": "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food.", + "mode": "NULLABLE", + }, + { + "name": "center_classification_date", + "type": "DATE", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "report_date", + "type": "DATE", + "description": "Date that the FDA issued the enforcement report for the product recall.", + "mode": "NULLABLE", + }, + { + "name": "postal_code", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "termination_date", + "type": "DATE", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "recall_initiation_date", + "type": "DATE", + "description": "Date that the firm first began notifying the public or their consignees of the recall.", + "mode": "NULLABLE", + }, + { + "name": "recall_number", + "type": "STRING", + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes.", + "mode": "NULLABLE", + }, + { + "name": "city", + "type": "STRING", + "description": "The city in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "event_id", + "type": "INTEGER", + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes.", + "mode": "NULLABLE", + }, + { + "name": "distribution_pattern", + "type": "STRING", + "description": "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included.", + "mode": "NULLABLE", + }, + { + "name": "recalling_firm", + "type": "STRING", + "description": "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled.", + "mode": "NULLABLE", + }, + { + "name": "voluntary_mandated", + "type": "STRING", + "description": "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA).", + "mode": "NULLABLE", + }, + { + "name": "state", + "type": "STRING", + "description": "The U.S. state in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "reason_for_recall", + "type": "STRING", + "description": "Information describing how the product is defective and violates the FD&C Act or related statutes.", + "mode": "NULLABLE", + }, + { + "name": "initial_firm_notification", + "type": "STRING", + "description": "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered.", + "mode": "NULLABLE", + }, + { + "name": "status", + "type": "STRING", + "description": "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified.", + "mode": "NULLABLE", + }, + { + "name": "product_type", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "country", + "type": "STRING", + "description": "The country in which the recalling firm is located.", + "mode": "NULLABLE", + }, + { + "name": "product_description", + "type": "STRING", + "description": "Brief description of the product being recalled.", + "mode": "NULLABLE", + }, + { + "name": "code_info", + "type": "STRING", + "description": "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling.", + "mode": "NULLABLE", + }, + { + "name": "address_1", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "address_2", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "product_quantity", + "type": "STRING", + "description": "The amount of defective product subject to recall.", + "mode": "NULLABLE", + }, + { + "name": "more_code_info", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/fda_food/food_enforcement/pipeline.yaml b/datasets/fda_food/food_enforcement/pipeline.yaml new file mode 100644 index 000000000..9f713e69f --- /dev/null +++ b/datasets/fda_food/food_enforcement/pipeline.yaml @@ -0,0 +1,190 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "food_enforcement" + description: "fda_foodspc" + +dag: + airflow_version: 2 + initialize: + dag_id: food_enforcement + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "food_enforcement" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}" + env_vars: + SOURCE_URL: "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "2500000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/fda_food/food_enforcement/data_output.csv" + DATA_NAMES: >- + [ "country", "city", "address_1", "reason_for_recall", "address_2", + "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state", + "product_description", "report_date", "classification", "openfda", "recalling_firm", + "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date", + "recall_initiation_date", "postal_code", "voluntary_mandated", "status", "more_code_info" ] + DATA_DTYPES: >- + { "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str", + "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str", + "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str", + "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str", + "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status" : "str", "more_code_info": "str" } + resources: + limit_memory: "8G" + limit_cpu: "3" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/fda_food/food_enforcement/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "{{ var.value.container_registry.food_enforcement_destination_table }}" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name" : "classification" + "type" : "STRING" + "description" : "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food." + "mode" : "NULLABLE" + - "name" : "center_classification_date" + "type" : "DATE" + "description" : "" + "mode" : "NULLABLE" + - "name" : "report_date" + "type" : "DATE" + "description" : "Date that the FDA issued the enforcement report for the product recall." + "mode" : "NULLABLE" + - "name" : "postal_code" + "type" : "STRING" + "description" : "" + "mode" : "NULLABLE" + - "name" : "termination_date" + "type" : "DATE" + "description" : "" + "mode" : "NULLABLE" + - "name" : "recall_initiation_date" + "type" : "DATE" + "description" : "Date that the firm first began notifying the public or their consignees of the recall." + "mode" : "NULLABLE" + - "name" : "recall_number" + "type" : "STRING" + "description" : "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode" : "NULLABLE" + - "name" : "city" + "type" : "STRING" + "description" : "The city in which the recalling firm is located." + "mode" : "NULLABLE" + - "name" : "event_id" + "type" : "INTEGER" + "description" : "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode" : "NULLABLE" + - "name" : "distribution_pattern" + "type" : "STRING" + "description" : "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included." + "mode" : "NULLABLE" + - "name" : "recalling_firm" + "type" : "STRING" + "description" : "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled." + "mode" : "NULLABLE" + - "name" : "voluntary_mandated" + "type" : "STRING" + "description" : "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA)." + "mode" : "NULLABLE" + - "name" : "state" + "type" : "STRING" + "description" : "The U.S. state in which the recalling firm is located." + "mode" : "NULLABLE" + - "name" : "reason_for_recall" + "type" : "STRING" + "description" : "Information describing how the product is defective and violates the FD&C Act or related statutes." + "mode" : "NULLABLE" + - "name" : "initial_firm_notification" + "type" : "STRING" + "description" : "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered." + "mode" : "NULLABLE" + - "name" : "status" + "type" : "STRING" + "description" : "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified." + "mode" : "NULLABLE" + - "name" : "product_type" + "type" : "STRING" + "description" : "" + "mode" : "NULLABLE" + - "name" : "country" + "type" : "STRING" + "description" : "The country in which the recalling firm is located." + "mode" : "NULLABLE" + - "name" : "product_description" + "type" : "STRING" + "description" : "Brief description of the product being recalled." + "mode" : "NULLABLE" + - "name" : "code_info" + "type" : "STRING" + "description" : "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling." + "mode" : "NULLABLE" + - "name" : "address_1" + "type" : "STRING" + "description" : "" + "mode" : "NULLABLE" + - "name" : "address_2" + "type" : "STRING" + "description" : "" + "mode" : "NULLABLE" + - "name" : "product_quantity" + "type" : "STRING" + "description" : "The amount of defective product subject to recall." + "mode" : "NULLABLE" + - "name" : "more_code_info" + "type" : "STRING" + "description" : "" + "mode" : "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" From 79bca5cd33d16d6ca8c962dcba517265e6caca31 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Tue, 2 Nov 2021 00:33:40 +0000 Subject: [PATCH 2/7] fix: Food Enforcement now works in AF --- .../csv_transform.py | 77 ++++--- .../food_enforcement/food_enforcement_dag.py | 14 +- .../fda_food/food_enforcement/pipeline.yaml | 206 +++++++++--------- 3 files changed, 159 insertions(+), 138 deletions(-) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py index 59ed1b29a..ff097680c 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py @@ -35,37 +35,39 @@ def main( target_gcs_bucket: str, target_gcs_path: str, data_names: typing.List[str], - data_dtypes: dict + data_dtypes: dict, ) -> None: - logging.info("Food and Drug Administration (FDA) - Food Enforcement process started") + logging.info( + "Food and Drug Administration (FDA) - Food Enforcement process started" + ) pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - # source_file_json = str(source_file).replace(".csv", "") + "_status.json" dest_path = os.path.split(source_file)[0] source_zip_file = dest_path + "/" + os.path.split(source_url)[1] source_json_file = source_zip_file.replace(".zip", "") - # source_csv_file = source_json_file.replace(".json", ".csv") - # download_file_http(source_url, source_zip_file, False) - # unpack_file(source_zip_file, dest_path, "zip") - # convert_json_to_csv(source_json_file, source_file) + download_file_http(source_url, source_zip_file, False) + unpack_file(source_zip_file, dest_path, "zip") + convert_json_to_csv(source_json_file, source_file) process_source_file( - source_file, target_file, data_names, data_dtypes, int(chunksize) #, key_list=[] + source_file, + target_file, + data_names, + data_dtypes, + int(chunksize), # , key_list=[] ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info("Food and Drug Administration (FDA) - Food Enforcement process completed") + logging.info( + "Food and Drug Administration (FDA) - Food Enforcement process completed" + ) + def process_source_file( - source_file: str, - target_file: str, - names: list, - dtypes: dict, - chunksize: int, - # key_list: list, + source_file: str, target_file: str, names: list, dtypes: dict, chunksize: int ) -> None: logging.info(f"Opening batch file {source_file}") with pd.read_csv( @@ -90,41 +92,62 @@ def process_source_file( df = pd.DataFrame() df = pd.concat([df, chunk]) process_chunk( - df, target_file_batch, target_file, (not chunk_number == 0) #, key_list + df, + target_file_batch, + target_file, + (not chunk_number == 0), # , key_list ) - def process_chunk( df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool ) -> None: df = trim_whitespace(df) - date_col_list = ["center_classification_date", "report_date", "termination_date", "recall_initiation_date"] + date_col_list = [ + "center_classification_date", + "report_date", + "termination_date", + "recall_initiation_date", + ] df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) df = reorder_headers(df) save_to_new_file(df, file_path=str(target_file_batch)) append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) -def resolve_date_format(df: pd.DataFrame, date_col_list: list, from_format: str, to_format: str="%Y-%m-%d %H:%M:%S", is_date: bool=False) -> pd.DataFrame: +def resolve_date_format( + df: pd.DataFrame, + date_col_list: list, + from_format: str, + to_format: str = "%Y-%m-%d %H:%M:%S", + is_date: bool = False, +) -> pd.DataFrame: logging.info("Resolving Date Format") for col in date_col_list: - logging.info(f"Resolving datetime on {col}") - df[col] = df[col].apply(lambda x: convert_dt_format(str(x), from_format, to_format, is_date)) + logging.info(f"Resolving datetime on {col}") + df[col] = df[col].apply( + lambda x: convert_dt_format(str(x), from_format, to_format, is_date) + ) return df -def convert_dt_format(dt_str: str, from_format: str, to_format: str, is_date: bool) -> str: +def convert_dt_format( + dt_str: str, from_format: str, to_format: str, is_date: bool +) -> str: rtnval = "" if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": rtnval = "" elif len(dt_str.strip()) == 10: # if there is no time format rtnval = dt_str + " 00:00:00" - elif (is_date): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): + elif ( + is_date + ): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): # if there is only a date in YYYYMMDD format then add dashes - rtnval = dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] + rtnval = ( + dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] + ) elif len(dt_str.strip().split(" ")[1]) == 8: # if format of time portion is 00:00:00 then use 00:00 format dt_str = dt_str[:-3] @@ -181,7 +204,7 @@ def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: "address_1", "address_2", "product_quantity", - "more_code_info" + "more_code_info", ] ] @@ -253,9 +276,7 @@ def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> N logging.info(f"{infile} not unpacked because it does not exist.") -def convert_json_to_csv( - source_file_json: str, source_file_csv: str -) -> None: +def convert_json_to_csv(source_file_json: str, source_file_csv: str) -> None: logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") f = open( source_file_json.strip(), diff --git a/datasets/fda_food/food_enforcement/food_enforcement_dag.py b/datasets/fda_food/food_enforcement/food_enforcement_dag.py index 8be0c92fe..635d2c0a1 100644 --- a/datasets/fda_food/food_enforcement/food_enforcement_dag.py +++ b/datasets/fda_food/food_enforcement/food_enforcement_dag.py @@ -58,14 +58,14 @@ image_pull_policy="Always", image="{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}", env_vars={ - "SOURCE_URL": "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001", + "SOURCE_URL": "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", - "CHUNKSIZE": "2500000", + "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", - "TARGET_GCS_PATH": "data/fda_food/food_enforcement/data_output.csv", - "DATA_NAMES": '[ "country", "city", "address_1", "reason_for_recall", "address_2",\n "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state",\n "product_description", "report_date", "classification", "openfda", "recalling_firm",\n "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date",\n "recall_initiation_date", "postal_code", "voluntary_mandated", "status", "more_code_info" ]', - "DATA_DTYPES": '{ "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str",\n "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str",\n "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str",\n "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str",\n "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status" : "str", "more_code_info": "str" }', + "TARGET_GCS_PATH": "data/fda_food/food_enforcement/files/data_output.csv", + "DATA_NAMES": '[ "country", "city", "address_1", "reason_for_recall", "address_2",\n "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state",\n "product_description", "report_date", "classification", "openfda", "recalling_firm",\n "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date",\n "more_code_info", "recall_initiation_date", "postal_code", "voluntary_mandated", "status" ]', + "DATA_DTYPES": '{ "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str",\n "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str",\n "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str",\n "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str",\n "more_code_info": "str", "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status": "str" }', }, resources={"limit_memory": "8G", "limit_cpu": "3"}, ) @@ -74,9 +74,9 @@ load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( task_id="load_to_bq", bucket="{{ var.value.composer_bucket }}", - source_objects=["data/fda_food/food_enforcement/data_output.csv"], + source_objects=["data/fda_food/food_enforcement/files/data_output.csv"], source_format="CSV", - destination_project_dataset_table="{{ var.value.container_registry.food_enforcement_destination_table }}", + destination_project_dataset_table="{{ var.json.fda_food.container_registry.food_enforcement_destination_table }}", skip_leading_rows=1, allow_quoted_newlines=True, write_disposition="WRITE_TRUNCATE", diff --git a/datasets/fda_food/food_enforcement/pipeline.yaml b/datasets/fda_food/food_enforcement/pipeline.yaml index 9f713e69f..1fd46ef8b 100644 --- a/datasets/fda_food/food_enforcement/pipeline.yaml +++ b/datasets/fda_food/food_enforcement/pipeline.yaml @@ -54,24 +54,24 @@ dag: image_pull_policy: "Always" image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}" env_vars: - SOURCE_URL: "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001" + SOURCE_URL: "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" - CHUNKSIZE: "2500000" + CHUNKSIZE: "750000" TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" - TARGET_GCS_PATH: "data/fda_food/food_enforcement/data_output.csv" + TARGET_GCS_PATH: "data/fda_food/food_enforcement/files/data_output.csv" DATA_NAMES: >- [ "country", "city", "address_1", "reason_for_recall", "address_2", "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state", "product_description", "report_date", "classification", "openfda", "recalling_firm", "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date", - "recall_initiation_date", "postal_code", "voluntary_mandated", "status", "more_code_info" ] + "more_code_info", "recall_initiation_date", "postal_code", "voluntary_mandated", "status" ] DATA_DTYPES: >- { "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str", "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str", "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str", "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str", - "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status" : "str", "more_code_info": "str" } + "more_code_info": "str", "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status": "str" } resources: limit_memory: "8G" limit_cpu: "3" @@ -82,109 +82,109 @@ dag: args: task_id: "load_to_bq" bucket: "{{ var.value.composer_bucket }}" - source_objects: ["data/fda_food/food_enforcement/data_output.csv"] + source_objects: ["data/fda_food/food_enforcement/files/data_output.csv"] source_format: "CSV" - destination_project_dataset_table: "{{ var.value.container_registry.food_enforcement_destination_table }}" + destination_project_dataset_table: "{{ var.json.fda_food.container_registry.food_enforcement_destination_table }}" skip_leading_rows: 1 allow_quoted_newlines: True write_disposition: "WRITE_TRUNCATE" schema_fields: - - "name" : "classification" - "type" : "STRING" - "description" : "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food." - "mode" : "NULLABLE" - - "name" : "center_classification_date" - "type" : "DATE" - "description" : "" - "mode" : "NULLABLE" - - "name" : "report_date" - "type" : "DATE" - "description" : "Date that the FDA issued the enforcement report for the product recall." - "mode" : "NULLABLE" - - "name" : "postal_code" - "type" : "STRING" - "description" : "" - "mode" : "NULLABLE" - - "name" : "termination_date" - "type" : "DATE" - "description" : "" - "mode" : "NULLABLE" - - "name" : "recall_initiation_date" - "type" : "DATE" - "description" : "Date that the firm first began notifying the public or their consignees of the recall." - "mode" : "NULLABLE" - - "name" : "recall_number" - "type" : "STRING" - "description" : "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." - "mode" : "NULLABLE" - - "name" : "city" - "type" : "STRING" - "description" : "The city in which the recalling firm is located." - "mode" : "NULLABLE" - - "name" : "event_id" - "type" : "INTEGER" - "description" : "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." - "mode" : "NULLABLE" - - "name" : "distribution_pattern" - "type" : "STRING" - "description" : "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included." - "mode" : "NULLABLE" - - "name" : "recalling_firm" - "type" : "STRING" - "description" : "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled." - "mode" : "NULLABLE" - - "name" : "voluntary_mandated" - "type" : "STRING" - "description" : "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA)." - "mode" : "NULLABLE" - - "name" : "state" - "type" : "STRING" - "description" : "The U.S. state in which the recalling firm is located." - "mode" : "NULLABLE" - - "name" : "reason_for_recall" - "type" : "STRING" - "description" : "Information describing how the product is defective and violates the FD&C Act or related statutes." - "mode" : "NULLABLE" - - "name" : "initial_firm_notification" - "type" : "STRING" - "description" : "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered." - "mode" : "NULLABLE" - - "name" : "status" - "type" : "STRING" - "description" : "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified." - "mode" : "NULLABLE" - - "name" : "product_type" - "type" : "STRING" - "description" : "" - "mode" : "NULLABLE" - - "name" : "country" - "type" : "STRING" - "description" : "The country in which the recalling firm is located." - "mode" : "NULLABLE" - - "name" : "product_description" - "type" : "STRING" - "description" : "Brief description of the product being recalled." - "mode" : "NULLABLE" - - "name" : "code_info" - "type" : "STRING" - "description" : "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling." - "mode" : "NULLABLE" - - "name" : "address_1" - "type" : "STRING" - "description" : "" - "mode" : "NULLABLE" - - "name" : "address_2" - "type" : "STRING" - "description" : "" - "mode" : "NULLABLE" - - "name" : "product_quantity" - "type" : "STRING" - "description" : "The amount of defective product subject to recall." - "mode" : "NULLABLE" - - "name" : "more_code_info" - "type" : "STRING" - "description" : "" - "mode" : "NULLABLE" + - "name": "classification" + "type": "STRING" + "description": "Numerical designation (I, II, or III) that is assigned by FDA to a particular product recall that indicates the relative degree of health hazard. Class I = Dangerous or defective products that predictably could cause serious health problems or death. Examples include: food found to contain botulinum toxin, food with undeclared allergens, a label mix-up on a lifesaving drug, or a defective artificial heart valve. Class II = Products that might cause a temporary health problem, or pose only a slight threat of a serious nature. Example: a drug that is under-strength but that is not used to treat life-threatening situations. Class III = Products that are unlikely to cause any adverse health reaction, but that violate FDA labeling or manufacturing laws. Examples include: a minor container defect and lack of English labeling in a retail food." + "mode": "NULLABLE" + - "name": "center_classification_date" + "type": "DATE" + "description": "" + "mode": "NULLABLE" + - "name": "report_date" + "type": "DATE" + "description": "Date that the FDA issued the enforcement report for the product recall." + "mode": "NULLABLE" + - "name": "postal_code" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "termination_date" + "type": "DATE" + "description": "" + "mode": "NULLABLE" + - "name": "recall_initiation_date" + "type": "DATE" + "description": "Date that the firm first began notifying the public or their consignees of the recall." + "mode": "NULLABLE" + - "name": "recall_number" + "type": "STRING" + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode": "NULLABLE" + - "name": "city" + "type": "STRING" + "description": "The city in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "event_id" + "type": "INTEGER" + "description": "A numerical designation assigned by FDA to a specific recall event used for tracking purposes." + "mode": "NULLABLE" + - "name": "distribution_pattern" + "type": "STRING" + "description": "General area of initial distribution such as, “Distributors in 6 states: NY, VA, TX, GA, FL and MA; the Virgin Islands; Canada and Japan”. The term “nationwide” is defined to mean the fifty states or a significant portion. Note that subsequent distribution by the consignees to other parties may not be included." + "mode": "NULLABLE" + - "name": "recalling_firm" + "type": "STRING" + "description": "The firm that initiates a recall or, in the case of an FDA requested recall or FDA mandated recall, the firm that has primary responsibility for the manufacture and (or) marketing of the product to be recalled." + "mode": "NULLABLE" + - "name": "voluntary_mandated" + "type": "STRING" + "description": "Describes who initiated the recall. Recalls are almost always voluntary, meaning initiated by a firm. A recall is deemed voluntary when the firm voluntarily removes or corrects marketed products or the FDA requests the marketed products be removed or corrected. A recall is mandated when the firm was ordered by the FDA to remove or correct the marketed products, under section 518(e) of the FD&C Act, National Childhood Vaccine Injury Act of 1986, 21 CFR 1271.440, Infant Formula Act of 1980 and its 1986 amendments, or the Food Safety Modernization Act (FSMA)." + "mode": "NULLABLE" + - "name": "state" + "type": "STRING" + "description": "The U.S. state in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "reason_for_recall" + "type": "STRING" + "description": "Information describing how the product is defective and violates the FD&C Act or related statutes." + "mode": "NULLABLE" + - "name": "initial_firm_notification" + "type": "STRING" + "description": "The method(s) by which the firm initially notified the public or their consignees of a recall. A consignee is a person or firm named in a bill of lading to whom or to whose order the product has or will be delivered." + "mode": "NULLABLE" + - "name": "status" + "type": "STRING" + "description": "On-Going = A recall which is currently in progress. Completed = The recall action reaches the point at which the firm has actually retrieved and impounded all outstanding product that could reasonably be expected to be recovered, or has completed all product corrections. Terminated = FDA has determined that all reasonable efforts have been made to remove or correct the violative product in accordance with the recall strategy, and proper disposition has been made according to the degree of hazard. Pending = Actions that have been determined to be recalls, but that remain in the process of being classified." + "mode": "NULLABLE" + - "name": "product_type" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "country" + "type": "STRING" + "description": "The country in which the recalling firm is located." + "mode": "NULLABLE" + - "name": "product_description" + "type": "STRING" + "description": "Brief description of the product being recalled." + "mode": "NULLABLE" + - "name": "code_info" + "type": "STRING" + "description": "A list of all lot and/or serial numbers, product numbers, packer or manufacturer numbers, sell or use by dates, etc., which appear on the product or its labeling." + "mode": "NULLABLE" + - "name": "address_1" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "address_2" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "product_quantity" + "type": "STRING" + "description": "The amount of defective product subject to recall." + "mode": "NULLABLE" + - "name": "more_code_info" + "type": "STRING" + "description": "" + "mode": "NULLABLE" graph_paths: - "transform_csv >> load_to_bq" From 2546c88abb70895cf36749e237149c89c06d75fa Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 3 Nov 2021 03:17:36 +0000 Subject: [PATCH 3/7] feat: Added fda_food food_events. --- .../requirements.txt | 2 - .../Dockerfile | 21 ++ .../csv_transform.py | 342 ++++++++++++++++++ .../requirements.txt | 3 + .../fda_food/food_events/food_events_dag.py | 160 ++++++++ datasets/fda_food/food_events/pipeline.yaml | 142 ++++++++ 6 files changed, 668 insertions(+), 2 deletions(-) create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py create mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt create mode 100644 datasets/fda_food/food_events/food_events_dag.py create mode 100644 datasets/fda_food/food_events/pipeline.yaml diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt index 88bfd2aba..f36704793 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt @@ -1,5 +1,3 @@ requests -numpy pandas google-cloud-storage -gsutil diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile b/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile new file mode 100644 index 000000000..748bc3bec --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile @@ -0,0 +1,21 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +FROM python:3.8 +ENV PYTHONUNBUFFERED True +COPY requirements.txt ./ +RUN python3 -m pip install --no-cache-dir -r requirements.txt +WORKDIR /custom +COPY ./csv_transform.py . +CMD ["python3", "csv_transform.py"] diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py new file mode 100644 index 000000000..98036e816 --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py @@ -0,0 +1,342 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import json +import logging +import os +import pathlib +import typing +import zipfile as zip + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + chunksize: str, + target_gcs_bucket: str, + target_gcs_path: str, + data_names: typing.List[str], + data_dtypes: dict, + rename_mappings: dict +) -> None: + + logging.info( + "Food and Drug Administration (FDA) - Food Events process started" + ) + + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + dest_path = os.path.split(source_file)[0] + source_zip_file = dest_path + "/" + os.path.split(source_url)[1] + source_json_file = source_zip_file.replace(".zip", "") + + download_file_http(source_url, source_zip_file, False) + unpack_file(source_zip_file, dest_path, "zip") + convert_json_to_csv(source_json_file, source_file, separator='|') + + process_source_file( + source_file, + target_file, + data_names, + data_dtypes, + int(chunksize), + rename_mappings + ) + + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + "Food and Drug Administration (FDA) - Food Events process completed" + ) + + +def process_source_file( + source_file: str, target_file: str, names: list, dtypes: dict, chunksize: int, rename_mappings: dict +) -> None: + logging.info(f"Opening batch file {source_file}") + with pd.read_csv( + source_file, # path to main source file to load in batches + engine="python", + encoding="utf-8", + quotechar='"', # string separator, typically double-quotes + chunksize=chunksize, # size of batch data, in no. of records + sep="|", # data column separator, typically "," + header=None, # use when the data file does not contain a header + names=names, + skiprows=1, + dtype=dtypes, + keep_default_na=True, + na_values=[" "] + ) as reader: + for chunk_number, chunk in enumerate(reader): + target_file_batch = str(target_file).replace( + ".csv", "-" + str(chunk_number) + ".csv" + ) + df = pd.DataFrame() + df = pd.concat([df, chunk]) + process_chunk( + df=df, + target_file_batch=target_file_batch, + target_file=target_file, + rename_mappings=rename_mappings, + skip_header=(not chunk_number == 0) + ) + + +def process_chunk( + df: pd.DataFrame, target_file_batch: str, target_file: str, rename_mappings: dict, skip_header: bool=False +) -> None: + + df = rename_headers(df, rename_mappings) + list_data = [ 'reactions', 'outcomes' ] + df = format_list_data(df, list_data) + # import pdb;pdb.set_trace() + df = trim_whitespace(df) + col_list = [ + "date_started", + "date_created", + "products_industry_name", + "products_industry_code" + ] + df = replace_nulls(df, col_list) + date_col_list = [ + "date_started", + "date_created" + ] + df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + +def format_list_data(df: pd.DataFrame, list_data: list) -> pd.DataFrame: + logging.info("Formatting list data") + for col in list_data: + df[col] = df[col].apply(lambda x: str(x).replace("[", "").replace("]", "").replace("'", "")) + + return df + +def replace_nulls(df: pd.DataFrame, col_list: list) -> pd.DataFrame: + logging.info("Resolving null text in source data") + for col in col_list: + df[col] = df[col].apply( + lambda x: "" if str(x).lower() == "null" else x + ) + + return df + +def resolve_date_format( + df: pd.DataFrame, + date_col_list: list, + from_format: str, + to_format: str = "%Y-%m-%d %H:%M:%S", + is_date: bool = False, +) -> pd.DataFrame: + logging.info("Resolving Date Format") + for col in date_col_list: + logging.info(f"Resolving datetime on {col}") + df[col] = df[col].apply( + lambda x: convert_dt_format(str(x), from_format, to_format, is_date) + ) + + return df + + +def convert_dt_format( + dt_str: str, from_format: str, to_format: str, is_date: bool +) -> str: + rtnval = "" + if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": + rtnval = "" + elif len(dt_str.strip()) == 10: + # if there is no time format + rtnval = dt_str + " 00:00:00" + elif ( + is_date + ): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): + # if there is only a date in YYYYMMDD format then add dashes + rtnval = ( + dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] + ) + elif len(dt_str.strip().split(" ")[1]) == 8: + # if format of time portion is 00:00:00 then use 00:00 format + dt_str = dt_str[:-3] + rtnval = datetime.datetime.strptime(dt_str, from_format).strftime(to_format) + elif (len(dt_str.strip().split("-")[0]) == 4) and ( + len(from_format.strip().split("/")[0]) == 2 + ): + # if the format of the date portion of the data is in YYYY-MM-DD format + # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format + # to use the YYYY-MM-DD. This resolves mixed date formats in files + from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] + else: + dt_str = "" + + # return datetime.datetime.strptime(dt_str, from_format).strftime("%Y-%m-%d %H:%M:%S") + return rtnval + + +def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Trimming whitespace") + for col in df.columns: + col_dtype = df[col].dtype + if col_dtype == "object": + logging.info(f"Trimming whitespace on {col}") + df[col] = df[col].apply(lambda x: str(x).strip()) + + return df + + +def save_to_new_file(df, file_path) -> None: + df.to_csv(file_path, index=False) + + +def append_batch_file( + batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool +) -> None: + data_file = open(batch_file_path, "r") + if truncate_file: + target_file = open(target_file_path, "w+").close() + target_file = open(target_file_path, "a+") + if skip_header: + logging.info( + f"Appending batch file {batch_file_path} to {target_file_path} with skip header" + ) + next(data_file) + else: + logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") + target_file.write(data_file.read()) + data_file.close() + target_file.close() + if os.path.exists(batch_file_path): + os.remove(batch_file_path) + + +def download_file_http( + source_url: str, source_file: pathlib.Path, continue_on_error: bool = False +) -> None: + logging.info(f"Downloading {source_url} to {source_file}") + try: + src_file = requests.get(source_url, stream=True) + with open(source_file, "wb") as f: + for chunk in src_file: + f.write(chunk) + except requests.exceptions.RequestException as e: + if e == requests.exceptions.HTTPError: + err_msg = "A HTTP error occurred." + elif e == requests.exceptions.Timeout: + err_msg = "A HTTP timeout error occurred." + elif e == requests.exceptions.TooManyRedirects: + err_msg = "Too Many Redirects occurred." + if not continue_on_error: + logging.info(f"{err_msg} Unable to obtain {source_url}") + raise SystemExit(e) + else: + logging.info( + f"{err_msg} Unable to obtain {source_url}. Continuing execution." + ) + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df = df.rename(columns=rename_mappings) + + return df + + +def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Re-ordering Headers") + df = df[ + [ + "report_number", + "reactions", + "outcomes", + "products_brand_name", + "products_industry_code", + "products_role", + "products_industry_name", + "date_created", + "date_started", + "consumer_gender", + "consumer_age", + "consumer_age_unit" + ] + ] + + return df + + +def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: + if os.path.exists(infile): + if compression_type == "zip": + logging.info(f"Unpacking {infile} to {dest_path}") + with zip.ZipFile(infile, mode="r") as zipf: + zipf.extractall(dest_path) + zipf.close() + else: + logging.info( + f"{infile} ignored as it is not compressed or is of unknown compression" + ) + else: + logging.info(f"{infile} not unpacked because it does not exist.") + + +def convert_json_to_csv(source_file_json: str, source_file_csv: str, separator: str="|") -> None: + logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") + f = open( + source_file_json.strip(), + ) + json_data = json.load(f) + df = pd.json_normalize( + json_data["results"], + record_path=['products'], + meta=[ + 'report_number', + 'outcomes', + 'date_created', + 'reactions', + 'date_started', + ["consumer", "age"], + ["consumer", "age_unit"], + ["consumer", "gender"]], + errors='ignore' + ) + for col in df.columns: + df[col] = df[col].fillna('') + df.to_csv(source_file_csv, index=False, sep=separator, quotechar='"', encoding="utf-8") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + chunksize=os.environ["CHUNKSIZE"], + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + data_names=json.loads(os.environ["DATA_NAMES"]), + data_dtypes=json.loads(os.environ["DATA_DTYPES"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]) + ) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt new file mode 100644 index 000000000..f36704793 --- /dev/null +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt @@ -0,0 +1,3 @@ +requests +pandas +google-cloud-storage diff --git a/datasets/fda_food/food_events/food_events_dag.py b/datasets/fda_food/food_events/food_events_dag.py new file mode 100644 index 000000000..38397af10 --- /dev/null +++ b/datasets/fda_food/food_events/food_events_dag.py @@ -0,0 +1,160 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators import kubernetes_pod +from airflow.providers.google.cloud.transfers import gcs_to_bigquery + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="fda_food.food_events", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + transform_csv = kubernetes_pod.KubernetesPodOperator( + task_id="transform_csv", + name="food_events", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_events }}", + env_vars={ + "SOURCE_URL": "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "CHUNKSIZE": "750000", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/fda_food/food_events/files/data_output.csv", + "DATA_NAMES": '[ "role", "name_brand", "industry_code", "industry_name", "report_number",\n "outcomes", "date_created", "reactions", "date_started", "consumer.age",\n "consumer.age_unit", "consumer.gender" ]', + "DATA_DTYPES": '{ "role": "str", "name_brand": "str", "industry_code": "str", "industry_name": "str", "report_number": "str",\n "outcomes": "str", "date_created": "str", "reactions": "str", "date_started": "str", "consumer.age": "float64",\n "consumer.age_unit": "str", "consumer.gender": "str" }', + "RENAME_MAPPINGS": '{ "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code",\n "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender",\n "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" }', + }, + resources={"limit_memory": "8G", "limit_cpu": "3"}, + ) + + # Task to load CSV data to a BigQuery table + load_to_bq = gcs_to_bigquery.GCSToBigQueryOperator( + task_id="load_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/fda_food/food_events/files/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="{{ var.json.fda_food.container_registry.food_events_destination_table }}", + skip_leading_rows=1, + allow_quoted_newlines=True, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "report_number", + "type": "STRING", + "description": "The report number", + "mode": "NULLABLE", + }, + { + "name": "reactions", + "type": "STRING", + "description": "Information on the reactions or symptoms experienced by the individual involved", + "mode": "NULLABLE", + }, + { + "name": "outcomes", + "type": "STRING", + "description": "Information on known outcomes or consequences of the adverse event. For more info, refer: https://open.fda.gov/food/event/reference/", + "mode": "NULLABLE", + }, + { + "name": "products_brand_name", + "type": "STRING", + "description": "The reported brand name of the product.", + "mode": "NULLABLE", + }, + { + "name": "products_industry_code", + "type": "STRING", + "description": "The FDA industry code for the product. Results in this endpoint are generally limited to products tagged with industry codes related to human food and nutritional supplements or cosmetics. For more info, refer: https://open.fda.gov/food/event/reference/", + "mode": "NULLABLE", + }, + { + "name": "products_role", + "type": "STRING", + "description": "", + "mode": "NULLABLE", + }, + { + "name": "products_industry_name", + "type": "STRING", + "description": "The FDA industry name associated with the product.", + "mode": "NULLABLE", + }, + { + "name": "date_created", + "type": "DATE", + "description": "Date the report was received by FDA.", + "mode": "NULLABLE", + }, + { + "name": "date_started", + "type": "DATE", + "description": "Date of the adverse event (when it was considered to have started).", + "mode": "NULLABLE", + }, + { + "name": "consumer_gender", + "type": "STRING", + "description": "The reported gender of the consumer. Female = Female Male = Male Not Available = Unknown", + "mode": "NULLABLE", + }, + { + "name": "consumer_age", + "type": "FLOAT", + "description": "The reported age of the consumer at the time of the adverse event report, expressed in the unit in the field age_unit", + "mode": "NULLABLE", + }, + { + "name": "consumer_age_unit", + "type": "STRING", + "description": "Encodes the unit in which the age of the consumer is expressed. Day(s) = age is expressed in days Week(s) = age is expressed in weeks Month(s) = age is expressed in months Year(s) = age is expressed in years Decade(s) = age is expressed in decades Not Available = Unknown", + "mode": "NULLABLE", + }, + ], + ) + + transform_csv >> load_to_bq diff --git a/datasets/fda_food/food_events/pipeline.yaml b/datasets/fda_food/food_events/pipeline.yaml new file mode 100644 index 000000000..c1c0ea782 --- /dev/null +++ b/datasets/fda_food/food_events/pipeline.yaml @@ -0,0 +1,142 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + table_id: "food_events" + description: "fda_foodspc" + +dag: + airflow_version: 2 + initialize: + dag_id: food_events + default_args: + owner: "Google" + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "transform_csv" + name: "food_events" + namespace: "default" + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + image_pull_policy: "Always" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_events }}" + env_vars: + SOURCE_URL: "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + CHUNKSIZE: "750000" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/fda_food/food_events/files/data_output.csv" + DATA_NAMES: >- + [ "role", "name_brand", "industry_code", "industry_name", "report_number", + "outcomes", "date_created", "reactions", "date_started", "consumer.age", + "consumer.age_unit", "consumer.gender" ] + DATA_DTYPES: >- + { "role": "str", "name_brand": "str", "industry_code": "str", "industry_name": "str", "report_number": "str", + "outcomes": "str", "date_created": "str", "reactions": "str", "date_started": "str", "consumer.age": "float64", + "consumer.age_unit": "str", "consumer.gender": "str" } + RENAME_MAPPINGS: >- + { "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code", + "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender", + "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" } + resources: + limit_memory: "8G" + limit_cpu: "3" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_to_bq" + bucket: "{{ var.value.composer_bucket }}" + source_objects: ["data/fda_food/food_events/files/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "{{ var.json.fda_food.container_registry.food_events_destination_table }}" + skip_leading_rows: 1 + allow_quoted_newlines: True + write_disposition: "WRITE_TRUNCATE" + schema_fields: + - "name": "report_number" + "type": "STRING" + "description": "The report number" + "mode": "NULLABLE" + - "name": "reactions" + "type": "STRING" + "description": "Information on the reactions or symptoms experienced by the individual involved" + "mode": "NULLABLE" + - "name": "outcomes" + "type": "STRING" + "description": "Information on known outcomes or consequences of the adverse event. For more info, refer: https://open.fda.gov/food/event/reference/" + "mode": "NULLABLE" + - "name": "products_brand_name" + "type": "STRING" + "description": "The reported brand name of the product." + "mode": "NULLABLE" + - "name": "products_industry_code" + "type": "STRING" + "description": "The FDA industry code for the product. Results in this endpoint are generally limited to products tagged with industry codes related to human food and nutritional supplements or cosmetics. For more info, refer: https://open.fda.gov/food/event/reference/" + "mode": "NULLABLE" + - "name": "products_role" + "type": "STRING" + "description": "" + "mode": "NULLABLE" + - "name": "products_industry_name" + "type": "STRING" + "description": "The FDA industry name associated with the product." + "mode": "NULLABLE" + - "name": "date_created" + "type": "DATE" + "description": "Date the report was received by FDA." + "mode": "NULLABLE" + - "name": "date_started" + "type": "DATE" + "description": "Date of the adverse event (when it was considered to have started)." + "mode": "NULLABLE" + - "name": "consumer_gender" + "type": "STRING" + "description": "The reported gender of the consumer. Female = Female Male = Male Not Available = Unknown" + "mode": "NULLABLE" + - "name": "consumer_age" + "type": "FLOAT" + "description": "The reported age of the consumer at the time of the adverse event report, expressed in the unit in the field age_unit" + "mode": "NULLABLE" + - "name": "consumer_age_unit" + "type": "STRING" + "description": "Encodes the unit in which the age of the consumer is expressed. Day(s) = age is expressed in days Week(s) = age is expressed in weeks Month(s) = age is expressed in months Year(s) = age is expressed in years Decade(s) = age is expressed in decades Not Available = Unknown" + "mode": "NULLABLE" + + graph_paths: + - "transform_csv >> load_to_bq" From 01969e6bf841bbaed351fe4cbc91d20e7d1fc05c Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 3 Nov 2021 18:17:44 +0000 Subject: [PATCH 4/7] fix: Added reorder columns to ensure integrity. --- .../csv_transform.py | 14 ++++++- .../_terraform/food_events_pipeline.tf | 39 +++++++++++++++++++ .../fda_food/food_events/food_events_dag.py | 2 + datasets/fda_food/food_events/pipeline.yaml | 2 + 4 files changed, 56 insertions(+), 1 deletion(-) create mode 100644 datasets/fda_food/_terraform/food_events_pipeline.tf diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py index 98036e816..b36630c12 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py @@ -20,6 +20,7 @@ import typing import zipfile as zip +import numpy as np import pandas as pd import requests from google.cloud import storage @@ -104,9 +105,10 @@ def process_chunk( ) -> None: df = rename_headers(df, rename_mappings) + df = reorder_headers(df) list_data = [ 'reactions', 'outcomes' ] df = format_list_data(df, list_data) - # import pdb;pdb.set_trace() + df = replace_nan_data(df) df = trim_whitespace(df) col_list = [ "date_started", @@ -123,6 +125,12 @@ def process_chunk( save_to_new_file(df, file_path=str(target_file_batch)) append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) +def replace_nan_data(df: pd.DataFrame) -> pd.DataFrame: + logging.info("Replacing NaN data") + df = df.replace(np.nan, '', regex=True) + + return df + def format_list_data(df: pd.DataFrame, list_data: list) -> pd.DataFrame: logging.info("Formatting list data") for col in list_data: @@ -202,6 +210,10 @@ def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: def save_to_new_file(df, file_path) -> None: + # df = df[ df['reactions'] == 'RASH, DIARRHOEA'] + # df = df[ df['report_number'].isin(['80961']) ] + # df = df[ df['report_number'] != '150055'] + # df = df[ df['report_number'] != '172480'] df.to_csv(file_path, index=False) diff --git a/datasets/fda_food/_terraform/food_events_pipeline.tf b/datasets/fda_food/_terraform/food_events_pipeline.tf new file mode 100644 index 000000000..ffdd9062f --- /dev/null +++ b/datasets/fda_food/_terraform/food_events_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "fda_food_food_events" { + project = var.project_id + dataset_id = "fda_food" + table_id = "food_events" + + description = "fda_foodspc" + + + + + depends_on = [ + google_bigquery_dataset.fda_food + ] +} + +output "bigquery_table-fda_food_food_events-table_id" { + value = google_bigquery_table.fda_food_food_events.table_id +} + +output "bigquery_table-fda_food_food_events-id" { + value = google_bigquery_table.fda_food_food_events.id +} diff --git a/datasets/fda_food/food_events/food_events_dag.py b/datasets/fda_food/food_events/food_events_dag.py index 38397af10..77f230667 100644 --- a/datasets/fda_food/food_events/food_events_dag.py +++ b/datasets/fda_food/food_events/food_events_dag.py @@ -80,6 +80,8 @@ destination_project_dataset_table="{{ var.json.fda_food.container_registry.food_events_destination_table }}", skip_leading_rows=1, allow_quoted_newlines=True, + field_delimiter=",", + quote_character='"', write_disposition="WRITE_TRUNCATE", schema_fields=[ { diff --git a/datasets/fda_food/food_events/pipeline.yaml b/datasets/fda_food/food_events/pipeline.yaml index c1c0ea782..45f37e972 100644 --- a/datasets/fda_food/food_events/pipeline.yaml +++ b/datasets/fda_food/food_events/pipeline.yaml @@ -87,6 +87,8 @@ dag: destination_project_dataset_table: "{{ var.json.fda_food.container_registry.food_events_destination_table }}" skip_leading_rows: 1 allow_quoted_newlines: True + field_delimiter: "," + quote_character: "\"" write_disposition: "WRITE_TRUNCATE" schema_fields: - "name": "report_number" From 983e1302580612dd1b272da5111b056606a2c6de Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 3 Nov 2021 18:23:21 +0000 Subject: [PATCH 5/7] fix: Resolved black hook issue. --- .../csv_transform.py | 91 +++++++++++-------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py index b36630c12..8cca9e065 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py +++ b/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py @@ -35,12 +35,10 @@ def main( target_gcs_path: str, data_names: typing.List[str], data_dtypes: dict, - rename_mappings: dict + rename_mappings: dict, ) -> None: - logging.info( - "Food and Drug Administration (FDA) - Food Events process started" - ) + logging.info("Food and Drug Administration (FDA) - Food Events process started") pathlib.Path("./files").mkdir(parents=True, exist_ok=True) dest_path = os.path.split(source_file)[0] @@ -49,7 +47,7 @@ def main( download_file_http(source_url, source_zip_file, False) unpack_file(source_zip_file, dest_path, "zip") - convert_json_to_csv(source_json_file, source_file, separator='|') + convert_json_to_csv(source_json_file, source_file, separator="|") process_source_file( source_file, @@ -57,18 +55,21 @@ def main( data_names, data_dtypes, int(chunksize), - rename_mappings + rename_mappings, ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - logging.info( - "Food and Drug Administration (FDA) - Food Events process completed" - ) + logging.info("Food and Drug Administration (FDA) - Food Events process completed") def process_source_file( - source_file: str, target_file: str, names: list, dtypes: dict, chunksize: int, rename_mappings: dict + source_file: str, + target_file: str, + names: list, + dtypes: dict, + chunksize: int, + rename_mappings: dict, ) -> None: logging.info(f"Opening batch file {source_file}") with pd.read_csv( @@ -83,7 +84,7 @@ def process_source_file( skiprows=1, dtype=dtypes, keep_default_na=True, - na_values=[" "] + na_values=[" "], ) as reader: for chunk_number, chunk in enumerate(reader): target_file_batch = str(target_file).replace( @@ -96,17 +97,21 @@ def process_source_file( target_file_batch=target_file_batch, target_file=target_file, rename_mappings=rename_mappings, - skip_header=(not chunk_number == 0) + skip_header=(not chunk_number == 0), ) def process_chunk( - df: pd.DataFrame, target_file_batch: str, target_file: str, rename_mappings: dict, skip_header: bool=False + df: pd.DataFrame, + target_file_batch: str, + target_file: str, + rename_mappings: dict, + skip_header: bool = False, ) -> None: df = rename_headers(df, rename_mappings) df = reorder_headers(df) - list_data = [ 'reactions', 'outcomes' ] + list_data = ["reactions", "outcomes"] df = format_list_data(df, list_data) df = replace_nan_data(df) df = trim_whitespace(df) @@ -114,39 +119,40 @@ def process_chunk( "date_started", "date_created", "products_industry_name", - "products_industry_code" + "products_industry_code", ] df = replace_nulls(df, col_list) - date_col_list = [ - "date_started", - "date_created" - ] + date_col_list = ["date_started", "date_created"] df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) save_to_new_file(df, file_path=str(target_file_batch)) append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + def replace_nan_data(df: pd.DataFrame) -> pd.DataFrame: logging.info("Replacing NaN data") - df = df.replace(np.nan, '', regex=True) + df = df.replace(np.nan, "", regex=True) return df + def format_list_data(df: pd.DataFrame, list_data: list) -> pd.DataFrame: logging.info("Formatting list data") for col in list_data: - df[col] = df[col].apply(lambda x: str(x).replace("[", "").replace("]", "").replace("'", "")) + df[col] = df[col].apply( + lambda x: str(x).replace("[", "").replace("]", "").replace("'", "") + ) return df + def replace_nulls(df: pd.DataFrame, col_list: list) -> pd.DataFrame: logging.info("Resolving null text in source data") for col in col_list: - df[col] = df[col].apply( - lambda x: "" if str(x).lower() == "null" else x - ) + df[col] = df[col].apply(lambda x: "" if str(x).lower() == "null" else x) return df + def resolve_date_format( df: pd.DataFrame, date_col_list: list, @@ -284,7 +290,7 @@ def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: "date_started", "consumer_gender", "consumer_age", - "consumer_age_unit" + "consumer_age_unit", ] ] @@ -306,7 +312,9 @@ def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> N logging.info(f"{infile} not unpacked because it does not exist.") -def convert_json_to_csv(source_file_json: str, source_file_csv: str, separator: str="|") -> None: +def convert_json_to_csv( + source_file_json: str, source_file_csv: str, separator: str = "|" +) -> None: logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") f = open( source_file_json.strip(), @@ -314,21 +322,24 @@ def convert_json_to_csv(source_file_json: str, source_file_csv: str, separator: json_data = json.load(f) df = pd.json_normalize( json_data["results"], - record_path=['products'], + record_path=["products"], meta=[ - 'report_number', - 'outcomes', - 'date_created', - 'reactions', - 'date_started', - ["consumer", "age"], - ["consumer", "age_unit"], - ["consumer", "gender"]], - errors='ignore' - ) + "report_number", + "outcomes", + "date_created", + "reactions", + "date_started", + ["consumer", "age"], + ["consumer", "age_unit"], + ["consumer", "gender"], + ], + errors="ignore", + ) for col in df.columns: - df[col] = df[col].fillna('') - df.to_csv(source_file_csv, index=False, sep=separator, quotechar='"', encoding="utf-8") + df[col] = df[col].fillna("") + df.to_csv( + source_file_csv, index=False, sep=separator, quotechar='"', encoding="utf-8" + ) def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: @@ -350,5 +361,5 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) target_gcs_path=os.environ["TARGET_GCS_PATH"], data_names=json.loads(os.environ["DATA_NAMES"]), data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]) + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), ) From 345fa0ce0f5e1d857dfab5234d6091b893395e93 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 24 Nov 2021 23:25:06 +0000 Subject: [PATCH 6/7] fix: Merged images into one. --- .../Dockerfile | 0 .../csv_transform.py | 111 ++++--- .../requirements.txt | 0 .../csv_transform.py | 308 ------------------ .../Dockerfile | 21 -- .../requirements.txt | 3 - .../food_enforcement/food_enforcement_dag.py | 13 +- .../fda_food/food_enforcement/pipeline.yaml | 42 ++- .../fda_food/food_events/food_events_dag.py | 6 +- datasets/fda_food/food_events/pipeline.yaml | 13 +- 10 files changed, 123 insertions(+), 394 deletions(-) rename datasets/fda_food/_images/{run_csv_transform_kub_food_enforcement => run_csv_transform_kub}/Dockerfile (100%) rename datasets/fda_food/_images/{run_csv_transform_kub_food_events => run_csv_transform_kub}/csv_transform.py (83%) rename datasets/fda_food/_images/{run_csv_transform_kub_food_enforcement => run_csv_transform_kub}/requirements.txt (100%) delete mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py delete mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile delete mode 100644 datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile b/datasets/fda_food/_images/run_csv_transform_kub/Dockerfile similarity index 100% rename from datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/Dockerfile rename to datasets/fda_food/_images/run_csv_transform_kub/Dockerfile diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py similarity index 83% rename from datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py rename to datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py index 8cca9e065..06b5da7e7 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_events/csv_transform.py +++ b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py @@ -27,6 +27,7 @@ def main( + pipeline: str, source_url: str, source_file: pathlib.Path, target_file: pathlib.Path, @@ -36,6 +37,9 @@ def main( data_names: typing.List[str], data_dtypes: dict, rename_mappings: dict, + reorder_headers_list: typing.List[str], + record_path: str, + meta: typing.List[str] ) -> None: logging.info("Food and Drug Administration (FDA) - Food Events process started") @@ -47,15 +51,17 @@ def main( download_file_http(source_url, source_zip_file, False) unpack_file(source_zip_file, dest_path, "zip") - convert_json_to_csv(source_json_file, source_file, separator="|") + convert_json_to_csv(source_json_file, source_file, record_path=record_path, meta=meta, separator="|") process_source_file( + pipeline, source_file, target_file, data_names, data_dtypes, int(chunksize), rename_mappings, + reorder_headers_list ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) @@ -64,12 +70,14 @@ def main( def process_source_file( + pipeline: str, source_file: str, target_file: str, names: list, dtypes: dict, chunksize: int, rename_mappings: dict, + reorder_headers_list: list ) -> None: logging.info(f"Opening batch file {source_file}") with pd.read_csv( @@ -97,7 +105,9 @@ def process_source_file( target_file_batch=target_file_batch, target_file=target_file, rename_mappings=rename_mappings, - skip_header=(not chunk_number == 0), + reorder_headers_list=reorder_headers_list, + pipeline=pipeline, + skip_header=(not chunk_number == 0) ) @@ -106,11 +116,27 @@ def process_chunk( target_file_batch: str, target_file: str, rename_mappings: dict, + reorder_headers_list: list, + pipeline: str, skip_header: bool = False, ) -> None: + if pipeline == 'food events': + df = process_food_events(df, rename_mappings, reorder_headers_list) + elif pipeline == 'food enforcement': + df = process_food_enforcement(df, reorder_headers_list) + else: + logging.info('pipeline was not specified') + save_to_new_file(df, file_path=str(target_file_batch)) + append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + +def process_food_events( + df: pd.DataFrame, + rename_mappings: dict, + reorder_headers_list: list +) -> None: df = rename_headers(df, rename_mappings) - df = reorder_headers(df) + df = reorder_headers(df, reorder_headers_list) list_data = ["reactions", "outcomes"] df = format_list_data(df, list_data) df = replace_nan_data(df) @@ -124,8 +150,25 @@ def process_chunk( df = replace_nulls(df, col_list) date_col_list = ["date_started", "date_created"] df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) + + return df + + +def process_food_enforcement( + df: pd.DataFrame, + reorder_headers_list: list +) -> None: + df = trim_whitespace(df) + date_col_list = [ + "center_classification_date", + "report_date", + "termination_date", + "recall_initiation_date", + ] + df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) + df = reorder_headers(df, reorder_headers_list) + + return df def replace_nan_data(df: pd.DataFrame) -> pd.DataFrame: @@ -200,7 +243,6 @@ def convert_dt_format( else: dt_str = "" - # return datetime.datetime.strptime(dt_str, from_format).strftime("%Y-%m-%d %H:%M:%S") return rtnval @@ -216,10 +258,6 @@ def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: def save_to_new_file(df, file_path) -> None: - # df = df[ df['reactions'] == 'RASH, DIARRHOEA'] - # df = df[ df['report_number'].isin(['80961']) ] - # df = df[ df['report_number'] != '150055'] - # df = df[ df['report_number'] != '172480'] df.to_csv(file_path, index=False) @@ -275,24 +313,9 @@ def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: return df -def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: +def reorder_headers(df: pd.DataFrame, reorder_headers_list: list) -> pd.DataFrame: logging.info("Re-ordering Headers") - df = df[ - [ - "report_number", - "reactions", - "outcomes", - "products_brand_name", - "products_industry_code", - "products_role", - "products_industry_name", - "date_created", - "date_started", - "consumer_gender", - "consumer_age", - "consumer_age_unit", - ] - ] + df = df.reindex(columns=reorder_headers_list) return df @@ -313,28 +336,26 @@ def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> N def convert_json_to_csv( - source_file_json: str, source_file_csv: str, separator: str = "|" + source_file_json: str, + source_file_csv: str, + record_path: str, + meta: list, + separator: str = "|" ) -> None: logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") f = open( source_file_json.strip(), ) json_data = json.load(f) - df = pd.json_normalize( - json_data["results"], - record_path=["products"], - meta=[ - "report_number", - "outcomes", - "date_created", - "reactions", - "date_started", - ["consumer", "age"], - ["consumer", "age_unit"], - ["consumer", "gender"], - ], - errors="ignore", - ) + if record_path: + df = pd.json_normalize( + json_data["results"], + record_path=[record_path], + meta=meta, + errors="ignore", + ) + else: + df = pd.DataFrame(json_data["results"]) for col in df.columns: df[col] = df[col].fillna("") df.to_csv( @@ -353,6 +374,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) logging.getLogger().setLevel(logging.INFO) main( + pipeline=os.environ["PIPELINE"], source_url=os.environ["SOURCE_URL"], source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), @@ -362,4 +384,7 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) data_names=json.loads(os.environ["DATA_NAMES"]), data_dtypes=json.loads(os.environ["DATA_DTYPES"]), rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + reorder_headers_list=json.loads(os.environ["REORDER_HEADERS"]), + record_path=os.environ["RECORD_PATH"], + meta=json.loads(os.environ["META"]) ) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub/requirements.txt similarity index 100% rename from datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/requirements.txt rename to datasets/fda_food/_images/run_csv_transform_kub/requirements.txt diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py deleted file mode 100644 index ff097680c..000000000 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_enforcement/csv_transform.py +++ /dev/null @@ -1,308 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import datetime - -# import fnmatch -import json -import logging -import os -import pathlib -import typing -import zipfile as zip - -import pandas as pd -import requests -from google.cloud import storage - - -def main( - source_url: str, - source_file: pathlib.Path, - target_file: pathlib.Path, - chunksize: str, - target_gcs_bucket: str, - target_gcs_path: str, - data_names: typing.List[str], - data_dtypes: dict, -) -> None: - - logging.info( - "Food and Drug Administration (FDA) - Food Enforcement process started" - ) - - pathlib.Path("./files").mkdir(parents=True, exist_ok=True) - dest_path = os.path.split(source_file)[0] - source_zip_file = dest_path + "/" + os.path.split(source_url)[1] - source_json_file = source_zip_file.replace(".zip", "") - - download_file_http(source_url, source_zip_file, False) - unpack_file(source_zip_file, dest_path, "zip") - convert_json_to_csv(source_json_file, source_file) - - process_source_file( - source_file, - target_file, - data_names, - data_dtypes, - int(chunksize), # , key_list=[] - ) - - upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) - - logging.info( - "Food and Drug Administration (FDA) - Food Enforcement process completed" - ) - - -def process_source_file( - source_file: str, target_file: str, names: list, dtypes: dict, chunksize: int -) -> None: - logging.info(f"Opening batch file {source_file}") - with pd.read_csv( - source_file, # path to main source file to load in batches - engine="python", - encoding="utf-8", - quotechar='"', # string separator, typically double-quotes - chunksize=chunksize, # size of batch data, in no. of records - sep="|", # data column separator, typically "," - header=None, # use when the data file does not contain a header - names=names, - skiprows=1, - dtype=dtypes, - keep_default_na=True, - na_values=[" "] - # parse_dates=["start_date", "end_date"], - ) as reader: - for chunk_number, chunk in enumerate(reader): - target_file_batch = str(target_file).replace( - ".csv", "-" + str(chunk_number) + ".csv" - ) - df = pd.DataFrame() - df = pd.concat([df, chunk]) - process_chunk( - df, - target_file_batch, - target_file, - (not chunk_number == 0), # , key_list - ) - - -def process_chunk( - df: pd.DataFrame, target_file_batch: str, target_file: str, skip_header: bool -) -> None: - df = trim_whitespace(df) - date_col_list = [ - "center_classification_date", - "report_date", - "termination_date", - "recall_initiation_date", - ] - df = resolve_date_format(df, date_col_list, "%Y%m%d", "%Y-%m-%d", True) - df = reorder_headers(df) - save_to_new_file(df, file_path=str(target_file_batch)) - append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) - - -def resolve_date_format( - df: pd.DataFrame, - date_col_list: list, - from_format: str, - to_format: str = "%Y-%m-%d %H:%M:%S", - is_date: bool = False, -) -> pd.DataFrame: - logging.info("Resolving Date Format") - for col in date_col_list: - logging.info(f"Resolving datetime on {col}") - df[col] = df[col].apply( - lambda x: convert_dt_format(str(x), from_format, to_format, is_date) - ) - - return df - - -def convert_dt_format( - dt_str: str, from_format: str, to_format: str, is_date: bool -) -> str: - rtnval = "" - if not dt_str or str(dt_str).lower() == "nan" or str(dt_str).lower() == "nat": - rtnval = "" - elif len(dt_str.strip()) == 10: - # if there is no time format - rtnval = dt_str + " 00:00:00" - elif ( - is_date - ): # and from_format == "%Y%m%d" and to_format == "%Y-%m-%d") or (len(dt_str.strip()) == 8): - # if there is only a date in YYYYMMDD format then add dashes - rtnval = ( - dt_str.strip()[:4] + "-" + dt_str.strip()[4:6] + "-" + dt_str.strip()[6:8] - ) - elif len(dt_str.strip().split(" ")[1]) == 8: - # if format of time portion is 00:00:00 then use 00:00 format - dt_str = dt_str[:-3] - rtnval = datetime.datetime.strptime(dt_str, from_format).strftime(to_format) - elif (len(dt_str.strip().split("-")[0]) == 4) and ( - len(from_format.strip().split("/")[0]) == 2 - ): - # if the format of the date portion of the data is in YYYY-MM-DD format - # and from_format is in MM-DD-YYYY then resolve this by modifying the from_format - # to use the YYYY-MM-DD. This resolves mixed date formats in files - from_format = "%Y-%m-%d " + from_format.strip().split(" ")[1] - else: - dt_str = "" - - # return datetime.datetime.strptime(dt_str, from_format).strftime("%Y-%m-%d %H:%M:%S") - return rtnval - - -def trim_whitespace(df: pd.DataFrame) -> pd.DataFrame: - logging.info("Trimming whitespace") - for col in df.columns: - col_dtype = df[col].dtype - if col_dtype == "object": - logging.info(f"Trimming whitespace on {col}") - df[col] = df[col].apply(lambda x: str(x).strip()) - - return df - - -def reorder_headers(df: pd.DataFrame) -> pd.DataFrame: - logging.info("Re-ordering Headers") - df = df[ - [ - "classification", - "center_classification_date", - "report_date", - "postal_code", - "termination_date", - "recall_initiation_date", - "recall_number", - "city", - "event_id", - "distribution_pattern", - "recalling_firm", - "voluntary_mandated", - "state", - "reason_for_recall", - "initial_firm_notification", - "status", - "product_type", - "country", - "product_description", - "code_info", - "address_1", - "address_2", - "product_quantity", - "more_code_info", - ] - ] - - return df - - -def save_to_new_file(df, file_path) -> None: - df.to_csv(file_path, index=False) - - -def append_batch_file( - batch_file_path: str, target_file_path: str, skip_header: bool, truncate_file: bool -) -> None: - data_file = open(batch_file_path, "r") - if truncate_file: - target_file = open(target_file_path, "w+").close() - target_file = open(target_file_path, "a+") - if skip_header: - logging.info( - f"Appending batch file {batch_file_path} to {target_file_path} with skip header" - ) - next(data_file) - else: - logging.info(f"Appending batch file {batch_file_path} to {target_file_path}") - target_file.write(data_file.read()) - data_file.close() - target_file.close() - if os.path.exists(batch_file_path): - os.remove(batch_file_path) - - -def download_file_http( - source_url: str, source_file: pathlib.Path, continue_on_error: bool = False -) -> None: - logging.info(f"Downloading {source_url} to {source_file}") - try: - src_file = requests.get(source_url, stream=True) - with open(source_file, "wb") as f: - for chunk in src_file: - f.write(chunk) - except requests.exceptions.RequestException as e: - if e == requests.exceptions.HTTPError: - err_msg = "A HTTP error occurred." - elif e == requests.exceptions.Timeout: - err_msg = "A HTTP timeout error occurred." - elif e == requests.exceptions.TooManyRedirects: - err_msg = "Too Many Redirects occurred." - if not continue_on_error: - logging.info(f"{err_msg} Unable to obtain {source_url}") - raise SystemExit(e) - else: - logging.info( - f"{err_msg} Unable to obtain {source_url}. Continuing execution." - ) - - -def unpack_file(infile: str, dest_path: str, compression_type: str = "zip") -> None: - if os.path.exists(infile): - if compression_type == "zip": - logging.info(f"Unpacking {infile} to {dest_path}") - with zip.ZipFile(infile, mode="r") as zipf: - zipf.extractall(dest_path) - zipf.close() - else: - logging.info( - f"{infile} ignored as it is not compressed or is of unknown compression" - ) - else: - logging.info(f"{infile} not unpacked because it does not exist.") - - -def convert_json_to_csv(source_file_json: str, source_file_csv: str) -> None: - logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") - f = open( - source_file_json.strip(), - ) - json_data = json.load(f) - df = pd.DataFrame(json_data["results"]) - df.to_csv(source_file_csv, index=False, sep="|", quotechar='"', encoding="utf-8") - - -def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: - storage_client = storage.Client() - bucket = storage_client.bucket(gcs_bucket) - blob = bucket.blob(gcs_path) - blob.upload_from_filename(file_path) - - -if __name__ == "__main__": - logging.getLogger().setLevel(logging.INFO) - - main( - source_url=os.environ["SOURCE_URL"], - source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), - target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), - chunksize=os.environ["CHUNKSIZE"], - target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], - target_gcs_path=os.environ["TARGET_GCS_PATH"], - data_names=json.loads(os.environ["DATA_NAMES"]), - data_dtypes=json.loads(os.environ["DATA_DTYPES"]), - ) diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile b/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile deleted file mode 100644 index 748bc3bec..000000000 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_events/Dockerfile +++ /dev/null @@ -1,21 +0,0 @@ -# Copyright 2021 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -FROM python:3.8 -ENV PYTHONUNBUFFERED True -COPY requirements.txt ./ -RUN python3 -m pip install --no-cache-dir -r requirements.txt -WORKDIR /custom -COPY ./csv_transform.py . -CMD ["python3", "csv_transform.py"] diff --git a/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt b/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt deleted file mode 100644 index f36704793..000000000 --- a/datasets/fda_food/_images/run_csv_transform_kub_food_events/requirements.txt +++ /dev/null @@ -1,3 +0,0 @@ -requests -pandas -google-cloud-storage diff --git a/datasets/fda_food/food_enforcement/food_enforcement_dag.py b/datasets/fda_food/food_enforcement/food_enforcement_dag.py index 635d2c0a1..7f7deb994 100644 --- a/datasets/fda_food/food_enforcement/food_enforcement_dag.py +++ b/datasets/fda_food/food_enforcement/food_enforcement_dag.py @@ -56,18 +56,23 @@ } }, image_pull_policy="Always", - image="{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}", env_vars={ + "PIPELINE": "food enforcement", "SOURCE_URL": "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", "CHUNKSIZE": "750000", "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", "TARGET_GCS_PATH": "data/fda_food/food_enforcement/files/data_output.csv", - "DATA_NAMES": '[ "country", "city", "address_1", "reason_for_recall", "address_2",\n "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state",\n "product_description", "report_date", "classification", "openfda", "recalling_firm",\n "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date",\n "more_code_info", "recall_initiation_date", "postal_code", "voluntary_mandated", "status" ]', - "DATA_DTYPES": '{ "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str",\n "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str",\n "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str",\n "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str",\n "more_code_info": "str", "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status": "str" }', + "DATA_NAMES": '[ "status", "city", "state", "country", "classification",\n "openfda", "product_type", "event_id", "recalling_firm", "address_1",\n "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",\n "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",\n "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]', + "DATA_DTYPES": '{ "status": "str", "city": "str", "state": "str", "country": "str", "classification": "str",\n "openfda": "str", "product_type": "str", "event_id": "str", "recalling_firm": "str", "address_1": "str",\n "address_2": "str", "postal_code": "str", "voluntary_mandated": "str", "initial_firm_notification": "str", "distribution_pattern": "str",\n "recall_number": "str", "product_description": "str", "product_quantity": "str", "reason_for_recall": "str", "recall_initiation_date": "str",\n "center_classification_date": "str", "report_date": "str", "code_info": "str", "more_code_info": "str", "termination_date": "str" }', + "RENAME_MAPPINGS": "{ }", + "REORDER_HEADERS": '[ "classification", "center_classification_date", "report_date", "postal_code", "termination_date",\n "recall_initiation_date", "recall_number", "city", "event_id", "distribution_pattern",\n "recalling_firm", "voluntary_mandated", "state", "reason_for_recall", "initial_firm_notification",\n "status", "product_type", "country", "product_description", "code_info",\n "address_1", "address_2", "product_quantity", "more_code_info" ]', + "RECORD_PATH": "", + "META": '[ "status", "city", "state", "country", "classification",\n "openfda", "product_type", "event_id", "recalling_firm", "address_1",\n "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern",\n "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date",\n "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ]', }, - resources={"limit_memory": "8G", "limit_cpu": "3"}, + resources={"limit_memory": "4G", "limit_cpu": "1"}, ) # Task to load CSV data to a BigQuery table diff --git a/datasets/fda_food/food_enforcement/pipeline.yaml b/datasets/fda_food/food_enforcement/pipeline.yaml index 1fd46ef8b..5c3580399 100644 --- a/datasets/fda_food/food_enforcement/pipeline.yaml +++ b/datasets/fda_food/food_enforcement/pipeline.yaml @@ -52,8 +52,9 @@ dag: values: - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_enforcement }}" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}" env_vars: + PIPELINE: 'food enforcement' SOURCE_URL: "https://download.open.fda.gov/food/enforcement/food-enforcement-0001-of-0001.json.zip" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" @@ -61,20 +62,35 @@ dag: TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" TARGET_GCS_PATH: "data/fda_food/food_enforcement/files/data_output.csv" DATA_NAMES: >- - [ "country", "city", "address_1", "reason_for_recall", "address_2", - "product_quantity", "code_info", "center_classification_date", "distribution_pattern", "state", - "product_description", "report_date", "classification", "openfda", "recalling_firm", - "recall_number", "initial_firm_notification", "product_type", "event_id", "termination_date", - "more_code_info", "recall_initiation_date", "postal_code", "voluntary_mandated", "status" ] + [ "status", "city", "state", "country", "classification", + "openfda", "product_type", "event_id", "recalling_firm", "address_1", + "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern", + "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date", + "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ] DATA_DTYPES: >- - { "country": "str", "city": "str", "address_1": "str", "reason_for_recall": "str", "address_2": "str", - "product_quantity": "str", "code_info": "str", "center_classification_date": "str", "distribution_pattern": "str", "state": "str", - "product_description": "str", "report_date": "str", "classification": "str", "openfda": "str", "recalling_firm": "str", - "recall_number": "str", "initial_firm_notification": "str", "product_type": "str", "event_id": "str", "termination_date": "str", - "more_code_info": "str", "recall_initiation_date": "str", "postal_code": "str", "voluntary_mandated": "str", "status": "str" } + { "status": "str", "city": "str", "state": "str", "country": "str", "classification": "str", + "openfda": "str", "product_type": "str", "event_id": "str", "recalling_firm": "str", "address_1": "str", + "address_2": "str", "postal_code": "str", "voluntary_mandated": "str", "initial_firm_notification": "str", "distribution_pattern": "str", + "recall_number": "str", "product_description": "str", "product_quantity": "str", "reason_for_recall": "str", "recall_initiation_date": "str", + "center_classification_date": "str", "report_date": "str", "code_info": "str", "more_code_info": "str", "termination_date": "str" } + RENAME_MAPPINGS: >- + { } + REORDER_HEADERS: >- + [ "classification", "center_classification_date", "report_date", "postal_code", "termination_date", + "recall_initiation_date", "recall_number", "city", "event_id", "distribution_pattern", + "recalling_firm", "voluntary_mandated", "state", "reason_for_recall", "initial_firm_notification", + "status", "product_type", "country", "product_description", "code_info", + "address_1", "address_2", "product_quantity", "more_code_info" ] + RECORD_PATH: "" + META: >- + [ "status", "city", "state", "country", "classification", + "openfda", "product_type", "event_id", "recalling_firm", "address_1", + "address_2", "postal_code", "voluntary_mandated", "initial_firm_notification", "distribution_pattern", + "recall_number", "product_description", "product_quantity", "reason_for_recall", "recall_initiation_date", + "center_classification_date", "report_date", "code_info", "more_code_info", "termination_date" ] resources: - limit_memory: "8G" - limit_cpu: "3" + limit_memory: "4G" + limit_cpu: "1" - operator: "GoogleCloudStorageToBigQueryOperator" description: "Task to load CSV data to a BigQuery table" diff --git a/datasets/fda_food/food_events/food_events_dag.py b/datasets/fda_food/food_events/food_events_dag.py index 77f230667..f0058319c 100644 --- a/datasets/fda_food/food_events/food_events_dag.py +++ b/datasets/fda_food/food_events/food_events_dag.py @@ -56,8 +56,9 @@ } }, image_pull_policy="Always", - image="{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_events }}", + image="{{ var.json.fda_food.container_registry.run_csv_transform_kub }}", env_vars={ + "PIPELINE": "food events", "SOURCE_URL": "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip", "SOURCE_FILE": "files/data.csv", "TARGET_FILE": "files/data_output.csv", @@ -67,6 +68,9 @@ "DATA_NAMES": '[ "role", "name_brand", "industry_code", "industry_name", "report_number",\n "outcomes", "date_created", "reactions", "date_started", "consumer.age",\n "consumer.age_unit", "consumer.gender" ]', "DATA_DTYPES": '{ "role": "str", "name_brand": "str", "industry_code": "str", "industry_name": "str", "report_number": "str",\n "outcomes": "str", "date_created": "str", "reactions": "str", "date_started": "str", "consumer.age": "float64",\n "consumer.age_unit": "str", "consumer.gender": "str" }', "RENAME_MAPPINGS": '{ "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code",\n "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender",\n "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" }', + "REORDER_HEADERS": '[ "report_number", "reactions", "outcomes", "products_brand_name", "products_industry_code",\n "products_role", "products_industry_name", "date_created", "date_started", "consumer_gender",\n "consumer_age", "consumer_age_unit" ]', + "RECORD_PATH": "products", + "META": '[\n "report_number", "outcomes", "date_created", "reactions", "date_started",\n ["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"]\n]', }, resources={"limit_memory": "8G", "limit_cpu": "3"}, ) diff --git a/datasets/fda_food/food_events/pipeline.yaml b/datasets/fda_food/food_events/pipeline.yaml index 45f37e972..57a5dd8d4 100644 --- a/datasets/fda_food/food_events/pipeline.yaml +++ b/datasets/fda_food/food_events/pipeline.yaml @@ -52,8 +52,9 @@ dag: values: - "pool-e2-standard-4" image_pull_policy: "Always" - image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub_food_events }}" + image: "{{ var.json.fda_food.container_registry.run_csv_transform_kub }}" env_vars: + PIPELINE: "food events" SOURCE_URL: "https://download.open.fda.gov/food/event/food-event-0001-of-0001.json.zip" SOURCE_FILE: "files/data.csv" TARGET_FILE: "files/data_output.csv" @@ -72,6 +73,16 @@ dag: { "report_number": "report_number", "reactions": "reactions", "outcomes": "outcomes", "name_brand": "products_brand_name", "industry_code": "products_industry_code", "role": "products_role", "industry_name": "products_industry_name", "date_created": "date_created", "date_started": "date_started", "consumer.gender": "consumer_gender", "consumer.age": "consumer_age", "consumer.age_unit": "consumer_age_unit" } + REORDER_HEADERS: >- + [ "report_number", "reactions", "outcomes", "products_brand_name", "products_industry_code", + "products_role", "products_industry_name", "date_created", "date_started", "consumer_gender", + "consumer_age", "consumer_age_unit" ] + RECORD_PATH: "products" + META: >- + [ + "report_number", "outcomes", "date_created", "reactions", "date_started", + ["consumer", "age"], ["consumer", "age_unit"], ["consumer", "gender"] + ] resources: limit_memory: "8G" limit_cpu: "3" From b976180807908f89e1ae2a7669714094bac72347 Mon Sep 17 00:00:00 2001 From: nlarge-google Date: Wed, 24 Nov 2021 23:28:20 +0000 Subject: [PATCH 7/7] fix: Resolved black hook format. --- .../run_csv_transform_kub/csv_transform.py | 31 +++++++++---------- 1 file changed, 14 insertions(+), 17 deletions(-) diff --git a/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py index 06b5da7e7..0fee34bb4 100644 --- a/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py +++ b/datasets/fda_food/_images/run_csv_transform_kub/csv_transform.py @@ -39,7 +39,7 @@ def main( rename_mappings: dict, reorder_headers_list: typing.List[str], record_path: str, - meta: typing.List[str] + meta: typing.List[str], ) -> None: logging.info("Food and Drug Administration (FDA) - Food Events process started") @@ -51,7 +51,9 @@ def main( download_file_http(source_url, source_zip_file, False) unpack_file(source_zip_file, dest_path, "zip") - convert_json_to_csv(source_json_file, source_file, record_path=record_path, meta=meta, separator="|") + convert_json_to_csv( + source_json_file, source_file, record_path=record_path, meta=meta, separator="|" + ) process_source_file( pipeline, @@ -61,7 +63,7 @@ def main( data_dtypes, int(chunksize), rename_mappings, - reorder_headers_list + reorder_headers_list, ) upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) @@ -77,7 +79,7 @@ def process_source_file( dtypes: dict, chunksize: int, rename_mappings: dict, - reorder_headers_list: list + reorder_headers_list: list, ) -> None: logging.info(f"Opening batch file {source_file}") with pd.read_csv( @@ -107,7 +109,7 @@ def process_source_file( rename_mappings=rename_mappings, reorder_headers_list=reorder_headers_list, pipeline=pipeline, - skip_header=(not chunk_number == 0) + skip_header=(not chunk_number == 0), ) @@ -120,20 +122,18 @@ def process_chunk( pipeline: str, skip_header: bool = False, ) -> None: - if pipeline == 'food events': + if pipeline == "food events": df = process_food_events(df, rename_mappings, reorder_headers_list) - elif pipeline == 'food enforcement': + elif pipeline == "food enforcement": df = process_food_enforcement(df, reorder_headers_list) else: - logging.info('pipeline was not specified') + logging.info("pipeline was not specified") save_to_new_file(df, file_path=str(target_file_batch)) append_batch_file(target_file_batch, target_file, skip_header, not (skip_header)) def process_food_events( - df: pd.DataFrame, - rename_mappings: dict, - reorder_headers_list: list + df: pd.DataFrame, rename_mappings: dict, reorder_headers_list: list ) -> None: df = rename_headers(df, rename_mappings) df = reorder_headers(df, reorder_headers_list) @@ -154,10 +154,7 @@ def process_food_events( return df -def process_food_enforcement( - df: pd.DataFrame, - reorder_headers_list: list -) -> None: +def process_food_enforcement(df: pd.DataFrame, reorder_headers_list: list) -> None: df = trim_whitespace(df) date_col_list = [ "center_classification_date", @@ -340,7 +337,7 @@ def convert_json_to_csv( source_file_csv: str, record_path: str, meta: list, - separator: str = "|" + separator: str = "|", ) -> None: logging.info(f"Converting JSON file {source_file_json} to {source_file_csv}") f = open( @@ -386,5 +383,5 @@ def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), reorder_headers_list=json.loads(os.environ["REORDER_HEADERS"]), record_path=os.environ["RECORD_PATH"], - meta=json.loads(os.environ["META"]) + meta=json.loads(os.environ["META"]), )