From d0425cd95a7c56cd788ff5330a791f3677767862 Mon Sep 17 00:00:00 2001 From: varshika06 <81638548+varshika06@users.noreply.github.com> Date: Wed, 6 Oct 2021 20:22:07 +0530 Subject: [PATCH] feat: Onboard CMS Medicare (#185) --- .../_images/run_csv_transform_kub/Dockerfile | 34 + .../run_csv_transform_kub/csv_transform.py | 153 ++++ .../run_csv_transform_kub/requirements.txt | 3 + .../_terraform/cms_medicare_dataset.tf | 26 + .../hospital_general_info_pipeline.tf | 39 + .../_terraform/inpatient_charges_pipeline.tf | 131 ++++ .../_terraform/outpatient_charges_pipeline.tf | 108 +++ datasets/cms_medicare/_terraform/provider.tf | 28 + datasets/cms_medicare/_terraform/variables.tf | 23 + datasets/cms_medicare/dataset.yaml | 58 ++ .../hospital_general_info_dag.py | 252 +++++++ .../hospital_general_info/pipeline.yaml | 260 +++++++ .../inpatient_charges_dag.py | 654 ++++++++++++++++ .../inpatient_charges/pipeline.yaml | 702 ++++++++++++++++++ .../outpatient_charges_dag.py | 503 +++++++++++++ .../outpatient_charges/pipeline.yaml | 538 ++++++++++++++ 16 files changed, 3512 insertions(+) create mode 100644 datasets/cms_medicare/_images/run_csv_transform_kub/Dockerfile create mode 100644 datasets/cms_medicare/_images/run_csv_transform_kub/csv_transform.py create mode 100644 datasets/cms_medicare/_images/run_csv_transform_kub/requirements.txt create mode 100644 datasets/cms_medicare/_terraform/cms_medicare_dataset.tf create mode 100644 datasets/cms_medicare/_terraform/hospital_general_info_pipeline.tf create mode 100644 datasets/cms_medicare/_terraform/inpatient_charges_pipeline.tf create mode 100644 datasets/cms_medicare/_terraform/outpatient_charges_pipeline.tf create mode 100644 datasets/cms_medicare/_terraform/provider.tf create mode 100644 datasets/cms_medicare/_terraform/variables.tf create mode 100644 datasets/cms_medicare/dataset.yaml create mode 100644 datasets/cms_medicare/hospital_general_info/hospital_general_info_dag.py create mode 100644 datasets/cms_medicare/hospital_general_info/pipeline.yaml create mode 100644 datasets/cms_medicare/inpatient_charges/inpatient_charges_dag.py create mode 100644 datasets/cms_medicare/inpatient_charges/pipeline.yaml create mode 100644 datasets/cms_medicare/outpatient_charges/outpatient_charges_dag.py create mode 100644 datasets/cms_medicare/outpatient_charges/pipeline.yaml diff --git a/datasets/cms_medicare/_images/run_csv_transform_kub/Dockerfile b/datasets/cms_medicare/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..62b210f95 --- /dev/null +++ b/datasets/cms_medicare/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,34 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/cms_medicare/_images/run_csv_transform_kub/csv_transform.py b/datasets/cms_medicare/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..c794db79e --- /dev/null +++ b/datasets/cms_medicare/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,153 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import fnmatch +import json +import logging +import os +import pathlib +import typing +from zipfile import ZipFile + +import pandas as pd +import requests +from google.cloud import storage + +PIPELINES_NAME_INPATIENT = [ + "inpatient_charges_2011", + "inpatient_charges_2012", + "inpatient_charges_2013", + "inpatient_charges_2014", + "inpatient_charges_2015", +] +PIPELINES_NAME_OUTPATIENT = [ + "outpatient_charges_2011", + "outpatient_charges_2012", + "outpatient_charges_2013", + "outpatient_charges_2014", +] + + +def main( + source_url: str, + source_file: pathlib.Path, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info("Creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file {source_url}") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}") + + if pipeline_name in (PIPELINES_NAME_INPATIENT + PIPELINES_NAME_OUTPATIENT): + with ZipFile(source_file) as zipped_files: + file_list = zipped_files.namelist() + csv_file = fnmatch.filter(file_list, "*.csv") + data = zipped_files.open(*csv_file) + df = pd.read_csv(data) + else: + df = pd.read_csv(str(source_file)) + + logging.info(f"Transformation Process Starting.. {source_file}") + + rename_headers(df, rename_mappings) + + filter_null_rows( + df, PIPELINES_NAME_INPATIENT, PIPELINES_NAME_OUTPATIENT, pipeline_name + ) + + df = df[headers] + + logging.info(f"Transformation Process complete .. {source_file}") + + logging.info(f"Saving to output file.. {target_file}") + + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + "CMS Medicare process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def filter_null_rows( + df: pd.DataFrame, + PIPELINES_NAME_INPATIENT: typing.List[str], + PIPELINES_NAME_OUTPATIENT: typing.List[str], + pipeline_name: str, +) -> pd.DataFrame: + if pipeline_name in PIPELINES_NAME_INPATIENT: + return df.dropna(subset=["drg_definition", "provider_id"], inplace=True) + elif pipeline_name in PIPELINES_NAME_OUTPATIENT: + return df.dropna(subset=["apc", "provider_id"], inplace=True) + else: + return df + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, float_format="%.0f", index=False) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} into {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/cms_medicare/_images/run_csv_transform_kub/requirements.txt b/datasets/cms_medicare/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..1c45cdfc3 --- /dev/null +++ b/datasets/cms_medicare/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +google-cloud-storage +pandas diff --git a/datasets/cms_medicare/_terraform/cms_medicare_dataset.tf b/datasets/cms_medicare/_terraform/cms_medicare_dataset.tf new file mode 100644 index 000000000..423800f11 --- /dev/null +++ b/datasets/cms_medicare/_terraform/cms_medicare_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "cms_medicare" { + dataset_id = "cms_medicare" + project = var.project_id + description = "CMS Medicare" +} + +output "bigquery_dataset-cms_medicare-dataset_id" { + value = google_bigquery_dataset.cms_medicare.dataset_id +} diff --git a/datasets/cms_medicare/_terraform/hospital_general_info_pipeline.tf b/datasets/cms_medicare/_terraform/hospital_general_info_pipeline.tf new file mode 100644 index 000000000..00f72fd26 --- /dev/null +++ b/datasets/cms_medicare/_terraform/hospital_general_info_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "hospital_general_info" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "hospital_general_info" + + description = "CMS Medicare Hospital General Info" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-hospital_general_info-table_id" { + value = google_bigquery_table.hospital_general_info.table_id +} + +output "bigquery_table-hospital_general_info-id" { + value = google_bigquery_table.hospital_general_info.id +} diff --git a/datasets/cms_medicare/_terraform/inpatient_charges_pipeline.tf b/datasets/cms_medicare/_terraform/inpatient_charges_pipeline.tf new file mode 100644 index 000000000..3388141f2 --- /dev/null +++ b/datasets/cms_medicare/_terraform/inpatient_charges_pipeline.tf @@ -0,0 +1,131 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "inpatient_charges_2011" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "inpatient_charges_2011" + + description = "CMS Medicare Inpatient Charges 2011" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-inpatient_charges_2011-table_id" { + value = google_bigquery_table.inpatient_charges_2011.table_id +} + +output "bigquery_table-inpatient_charges_2011-id" { + value = google_bigquery_table.inpatient_charges_2011.id +} + +resource "google_bigquery_table" "inpatient_charges_2012" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "inpatient_charges_2012" + + description = "CMS Medicare Inpatient Charges 2012" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-inpatient_charges_2012-table_id" { + value = google_bigquery_table.inpatient_charges_2012.table_id +} + +output "bigquery_table-inpatient_charges_2012-id" { + value = google_bigquery_table.inpatient_charges_2012.id +} + +resource "google_bigquery_table" "inpatient_charges_2013" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "inpatient_charges_2013" + + description = "CMS Medicare Inpatient Charges 2013" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-inpatient_charges_2013-table_id" { + value = google_bigquery_table.inpatient_charges_2013.table_id +} + +output "bigquery_table-inpatient_charges_2013-id" { + value = google_bigquery_table.inpatient_charges_2013.id +} + +resource "google_bigquery_table" "inpatient_charges_2014" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "inpatient_charges_2014" + + description = "CMS Medicare Inpatient Charges 2014" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-inpatient_charges_2014-table_id" { + value = google_bigquery_table.inpatient_charges_2014.table_id +} + +output "bigquery_table-inpatient_charges_2014-id" { + value = google_bigquery_table.inpatient_charges_2014.id +} + +resource "google_bigquery_table" "inpatient_charges_2015" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "inpatient_charges_2015" + + description = "CMS Medicare Inpatient Charges 2015" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-inpatient_charges_2015-table_id" { + value = google_bigquery_table.inpatient_charges_2015.table_id +} + +output "bigquery_table-inpatient_charges_2015-id" { + value = google_bigquery_table.inpatient_charges_2015.id +} diff --git a/datasets/cms_medicare/_terraform/outpatient_charges_pipeline.tf b/datasets/cms_medicare/_terraform/outpatient_charges_pipeline.tf new file mode 100644 index 000000000..bf39255c0 --- /dev/null +++ b/datasets/cms_medicare/_terraform/outpatient_charges_pipeline.tf @@ -0,0 +1,108 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "outpatient_charges_2011" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "outpatient_charges_2011" + + description = "CMS Medicare Outpatient Charges 2011" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-outpatient_charges_2011-table_id" { + value = google_bigquery_table.outpatient_charges_2011.table_id +} + +output "bigquery_table-outpatient_charges_2011-id" { + value = google_bigquery_table.outpatient_charges_2011.id +} + +resource "google_bigquery_table" "outpatient_charges_2012" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "outpatient_charges_2012" + + description = "CMS Medicare Outpatient Charges 2012" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-outpatient_charges_2012-table_id" { + value = google_bigquery_table.outpatient_charges_2012.table_id +} + +output "bigquery_table-outpatient_charges_2012-id" { + value = google_bigquery_table.outpatient_charges_2012.id +} + +resource "google_bigquery_table" "outpatient_charges_2013" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "outpatient_charges_2013" + + description = "CMS Medicare Outpatient Charges 2013" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-outpatient_charges_2013-table_id" { + value = google_bigquery_table.outpatient_charges_2013.table_id +} + +output "bigquery_table-outpatient_charges_2013-id" { + value = google_bigquery_table.outpatient_charges_2013.id +} + +resource "google_bigquery_table" "outpatient_charges_2014" { + project = var.project_id + dataset_id = "cms_medicare" + table_id = "outpatient_charges_2014" + + description = "CMS Medicare Outpatient Charges 2014" + + + + + depends_on = [ + google_bigquery_dataset.cms_medicare + ] +} + +output "bigquery_table-outpatient_charges_2014-table_id" { + value = google_bigquery_table.outpatient_charges_2014.table_id +} + +output "bigquery_table-outpatient_charges_2014-id" { + value = google_bigquery_table.outpatient_charges_2014.id +} diff --git a/datasets/cms_medicare/_terraform/provider.tf b/datasets/cms_medicare/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/cms_medicare/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/cms_medicare/_terraform/variables.tf b/datasets/cms_medicare/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/cms_medicare/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/cms_medicare/dataset.yaml b/datasets/cms_medicare/dataset.yaml new file mode 100644 index 000000000..8b7f7d34b --- /dev/null +++ b/datasets/cms_medicare/dataset.yaml @@ -0,0 +1,58 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + # The `dataset` block includes properties for your dataset that will be shown + # to users of your data on the Google Cloud website. + + # Must be exactly the same name as the folder name your dataset.yaml is in. + name: cms_medicare + + # A friendly, human-readable name of the dataset + friendly_name: cms_medicare + + # A short, descriptive summary of the dataset. + description: CMS Medicare + + # A list of sources the dataset is derived from, using the YAML list syntax. + dataset_sources: ~ + + # A list of terms and conditions that users of the dataset should agree on, + # using the YAML list syntax. + terms_of_use: ~ + + +resources: + # A list of Google Cloud resources needed by your dataset. In principle, all + # pipelines under a dataset should be able to share these resources. + # + # The currently supported resources are shown below. Use only the resources + # you need, and delete the rest as needed by your pipeline. + # + # We will keep adding to the list below to support more Google Cloud resources + # over time. If a resource you need isn't supported, please file an issue on + # the repository. + + - type: bigquery_dataset + # Google BigQuery dataset to namespace all tables managed by this folder + # + # Required Properties: + # dataset_id + # + # Optional Properties: + # friendly_name (A user-friendly name of the dataset) + # description (A user-friendly description of the dataset) + # location (The geographic location where the dataset should reside) + dataset_id: cms_medicare + description: "CMS Medicare" diff --git a/datasets/cms_medicare/hospital_general_info/hospital_general_info_dag.py b/datasets/cms_medicare/hospital_general_info/hospital_general_info_dag.py new file mode 100644 index 000000000..d7aa8273f --- /dev/null +++ b/datasets/cms_medicare/hospital_general_info/hospital_general_info_dag.py @@ -0,0 +1,252 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="cms_medicare.hospital_general_info", + default_args=default_args, + max_active_runs=1, + schedule_interval="@once", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + hospital_info_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="hospital_info_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_hospital_general_info", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://data.cms.gov/provider-data/sites/default/files/resources/092256becd267d9eeccf73bf7d16c46b_1623902717/Hospital_General_Information.csv", + "SOURCE_FILE": "files/data.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/hospital_general_info/data_output.csv", + "CSV_HEADERS": '["provider_id","hospital_name","address","city","state","zip_code","county_name","phone_number","hospital_type","hospital_ownership","emergency_services","meets_criteria_for_promoting_interoperability_of_ehrs","hospital_overall_rating","hospital_overall_rating_footnote","mortality_group_measure_count","facility_mortaility_measures_count","mortality_measures_better_count","mortality_measures_no_different_count","mortality_measures_worse_count","mortaility_group_footnote","safety_measures_count","facility_care_safety_measures_count","safety_measures_better_count","safety_measures_no_different_count","safety_measures_worse_count","safety_group_footnote","readmission_measures_count","facility_readmission_measures_count","readmission_measures_better_count","readmission_measures_no_different_count","readmission_measures_worse_count","readmission_measures_footnote","patient_experience_measures_count","facility_patient_experience_measures_count","patient_experience_measures_footnote","timely_and_effective_care_measures_count","facility_timely_and_effective_care_measures_count","timely_and_effective_care_measures_footnote"]', + "RENAME_MAPPINGS": '{"Facility ID": "provider_id","Facility Name": "hospital_name","Address": "address","City": "city","State": "state","ZIP Code": "zip_code","County Name": "county_name","Phone Number": "phone_number","Hospital Type": "hospital_type","Hospital Ownership": "hospital_ownership","Emergency Services": "emergency_services","Meets criteria for promoting interoperability of EHRs": "meets_criteria_for_promoting_interoperability_of_ehrs","Hospital overall rating": "hospital_overall_rating","Hospital overall rating footnote": "hospital_overall_rating_footnote","MORT Group Measure Count": "mortality_group_measure_count","Count of Facility MORT Measures": "facility_mortaility_measures_count","Count of MORT Measures Better": "mortality_measures_better_count","Count of MORT Measures No Different": "mortality_measures_no_different_count","Count of MORT Measures Worse": "mortality_measures_worse_count","MORT Group Footnote": "mortaility_group_footnote","Safety Group Measure Count": "safety_measures_count","Count of Facility Safety Measures": "facility_care_safety_measures_count","Count of Safety Measures Better": "safety_measures_better_count","Count of Safety Measures No Different": "safety_measures_no_different_count","Count of Safety Measures Worse": "safety_measures_worse_count","Safety Group Footnote": "safety_group_footnote","READM Group Measure Count": "readmission_measures_count","Count of Facility READM Measures": "facility_readmission_measures_count","Count of READM Measures Better": "readmission_measures_better_count","Count of READM Measures No Different": "readmission_measures_no_different_count","Count of READM Measures Worse": "readmission_measures_worse_count","READM Group Footnote": "readmission_measures_footnote","Pt Exp Group Measure Count": "patient_experience_measures_count","Count of Facility Pt Exp Measures": "facility_patient_experience_measures_count","Pt Exp Group Footnote": "patient_experience_measures_footnote","TE Group Measure Count": "timely_and_effective_care_measures_count","Count of Facility TE Measures": "facility_timely_and_effective_care_measures_count","TE Group Footnote": "timely_and_effective_care_measures_footnote"}', + "PIPELINE_NAME": "hospital_general_info", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_hospital_info_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_hospital_info_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/hospital_general_info/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.hospital_general_info", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + {"name": "provider_id", "type": "STRING", "mode": "NULLABLE"}, + {"name": "hospital_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "address", "type": "STRING", "mode": "NULLABLE"}, + {"name": "city", "type": "STRING", "mode": "NULLABLE"}, + {"name": "state", "type": "STRING", "mode": "NULLABLE"}, + {"name": "zip_code", "type": "STRING", "mode": "NULLABLE"}, + {"name": "county_name", "type": "STRING", "mode": "NULLABLE"}, + {"name": "phone_number", "type": "STRING", "mode": "NULLABLE"}, + {"name": "hospital_type", "type": "STRING", "mode": "NULLABLE"}, + {"name": "hospital_ownership", "type": "STRING", "mode": "NULLABLE"}, + {"name": "emergency_services", "type": "BOOLEAN", "mode": "NULLABLE"}, + { + "name": "meets_criteria_for_promoting_interoperability_of_ehrs", + "type": "BOOLEAN", + "mode": "NULLABLE", + }, + {"name": "hospital_overall_rating", "type": "STRING", "mode": "NULLABLE"}, + { + "name": "hospital_overall_rating_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Count of measures included in the Mortality measure group", + "name": "mortality_group_measure_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Mortality measures used in the hospital’s overall star rating", + "name": "facility_mortaility_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Mortality measures that are no different than the national value", + "name": "mortality_measures_better_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Mortality measures used in the hospital’s overall star rating", + "name": "mortality_measures_no_different_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Mortality measures that are worse than the national value", + "name": "mortality_measures_worse_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Footnote about Mortality measures", + "name": "mortaility_group_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Count of measures included in the Safety of Care measure group", + "name": "safety_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Safety of care measures used in the hospital’s overall star rating", + "name": "facility_care_safety_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Safety of care measures that are no different than the national value", + "name": "safety_measures_better_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Safety of care measures that are better than the national value", + "name": "safety_measures_no_different_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Safety of care measures that are worse than the national value", + "name": "safety_measures_worse_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Footnote about Safety of care measures", + "name": "safety_group_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Count of measures included in the Readmission measure group", + "name": "readmission_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Readmission measures used in the hospital’s overall star rating", + "name": "facility_readmission_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Readmission measures that are better than the national value", + "name": "readmission_measures_better_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Readmission measures that are no different than the national value", + "name": "readmission_measures_no_different_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Readmission measures that are worse than the national value", + "name": "readmission_measures_worse_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Footnote about Readmission measures", + "name": "readmission_measures_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Count of measures included in the Patient experience measure group", + "name": "patient_experience_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Patient experience measures used in the hospital’s overall star rating", + "name": "facility_patient_experience_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Footnote about Patient experience measures", + "name": "patient_experience_measures_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Count of measures included in the Timely and effective care measure group", + "name": "timely_and_effective_care_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Number of Timely and effective care measures used in the hospital’s overall star rating", + "name": "facility_timely_and_effective_care_measures_count", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "Footnote about Timely and effective care measures", + "name": "timely_and_effective_care_measures_footnote", + "type": "STRING", + "mode": "NULLABLE", + }, + ], + ) + + hospital_info_transform_csv >> load_hospital_info_to_bq diff --git a/datasets/cms_medicare/hospital_general_info/pipeline.yaml b/datasets/cms_medicare/hospital_general_info/pipeline.yaml new file mode 100644 index 000000000..6ee813395 --- /dev/null +++ b/datasets/cms_medicare/hospital_general_info/pipeline.yaml @@ -0,0 +1,260 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: hospital_general_info + + # Description of the table + description: "CMS Medicare Hospital General Info" + +dag: + + airflow_version: 1 + + initialize: + dag_id: hospital_general_info + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@once" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "hospital_info_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_hospital_general_info" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://data.cms.gov/provider-data/sites/default/files/resources/092256becd267d9eeccf73bf7d16c46b_1623902717/Hospital_General_Information.csv" + SOURCE_FILE: "files/data.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/hospital_general_info/data_output.csv" + CSV_HEADERS: >- + ["provider_id","hospital_name","address","city","state","zip_code","county_name","phone_number","hospital_type","hospital_ownership","emergency_services","meets_criteria_for_promoting_interoperability_of_ehrs","hospital_overall_rating","hospital_overall_rating_footnote","mortality_group_measure_count","facility_mortaility_measures_count","mortality_measures_better_count","mortality_measures_no_different_count","mortality_measures_worse_count","mortaility_group_footnote","safety_measures_count","facility_care_safety_measures_count","safety_measures_better_count","safety_measures_no_different_count","safety_measures_worse_count","safety_group_footnote","readmission_measures_count","facility_readmission_measures_count","readmission_measures_better_count","readmission_measures_no_different_count","readmission_measures_worse_count","readmission_measures_footnote","patient_experience_measures_count","facility_patient_experience_measures_count","patient_experience_measures_footnote","timely_and_effective_care_measures_count","facility_timely_and_effective_care_measures_count","timely_and_effective_care_measures_footnote"] + RENAME_MAPPINGS: >- + {"Facility ID": "provider_id","Facility Name": "hospital_name","Address": "address","City": "city","State": "state","ZIP Code": "zip_code","County Name": "county_name","Phone Number": "phone_number","Hospital Type": "hospital_type","Hospital Ownership": "hospital_ownership","Emergency Services": "emergency_services","Meets criteria for promoting interoperability of EHRs": "meets_criteria_for_promoting_interoperability_of_ehrs","Hospital overall rating": "hospital_overall_rating","Hospital overall rating footnote": "hospital_overall_rating_footnote","MORT Group Measure Count": "mortality_group_measure_count","Count of Facility MORT Measures": "facility_mortaility_measures_count","Count of MORT Measures Better": "mortality_measures_better_count","Count of MORT Measures No Different": "mortality_measures_no_different_count","Count of MORT Measures Worse": "mortality_measures_worse_count","MORT Group Footnote": "mortaility_group_footnote","Safety Group Measure Count": "safety_measures_count","Count of Facility Safety Measures": "facility_care_safety_measures_count","Count of Safety Measures Better": "safety_measures_better_count","Count of Safety Measures No Different": "safety_measures_no_different_count","Count of Safety Measures Worse": "safety_measures_worse_count","Safety Group Footnote": "safety_group_footnote","READM Group Measure Count": "readmission_measures_count","Count of Facility READM Measures": "facility_readmission_measures_count","Count of READM Measures Better": "readmission_measures_better_count","Count of READM Measures No Different": "readmission_measures_no_different_count","Count of READM Measures Worse": "readmission_measures_worse_count","READM Group Footnote": "readmission_measures_footnote","Pt Exp Group Measure Count": "patient_experience_measures_count","Count of Facility Pt Exp Measures": "facility_patient_experience_measures_count","Pt Exp Group Footnote": "patient_experience_measures_footnote","TE Group Measure Count": "timely_and_effective_care_measures_count","Count of Facility TE Measures": "facility_timely_and_effective_care_measures_count","TE Group Footnote": "timely_and_effective_care_measures_footnote"} + PIPELINE_NAME: "hospital_general_info" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_hospital_info_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/hospital_general_info/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.hospital_general_info" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - name: "provider_id" + type: "STRING" + mode: "NULLABLE" + - name: "hospital_name" + type: "STRING" + mode: "NULLABLE" + - name: "address" + type: "STRING" + mode: "NULLABLE" + - name: "city" + type: "STRING" + mode: "NULLABLE" + - name: "state" + type: "STRING" + mode: "NULLABLE" + - name: "zip_code" + type: "STRING" + mode: "NULLABLE" + - name: "county_name" + type: "STRING" + mode: "NULLABLE" + - name: "phone_number" + type: "STRING" + mode: "NULLABLE" + - name: "hospital_type" + type: "STRING" + mode: "NULLABLE" + - name: "hospital_ownership" + type: "STRING" + mode: "NULLABLE" + - name: "emergency_services" + type: "BOOLEAN" + mode: "NULLABLE" + - name: "meets_criteria_for_promoting_interoperability_of_ehrs" + type: "BOOLEAN" + mode: "NULLABLE" + - name: "hospital_overall_rating" + type: "STRING" + mode: "NULLABLE" + - name: "hospital_overall_rating_footnote" + type: "STRING" + mode: "NULLABLE" + - description: "Count of measures included in the Mortality measure group" + name: "mortality_group_measure_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Mortality measures used in the hospital’s overall star rating" + name: "facility_mortaility_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Mortality measures that are no different than the national value" + name: "mortality_measures_better_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Mortality measures used in the hospital’s overall star rating" + name: "mortality_measures_no_different_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Mortality measures that are worse than the national value" + name: "mortality_measures_worse_count" + type: "STRING" + mode: "NULLABLE" + - description: "Footnote about Mortality measures" + name: "mortaility_group_footnote" + type: "STRING" + mode: "NULLABLE" + - description: "Count of measures included in the Safety of Care measure group" + name: "safety_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Safety of care measures used in the hospital’s overall star rating" + name: "facility_care_safety_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Safety of care measures that are no different than the national value" + name: "safety_measures_better_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Safety of care measures that are better than the national value" + name: "safety_measures_no_different_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Safety of care measures that are worse than the national value" + name: "safety_measures_worse_count" + type: "STRING" + mode: "NULLABLE" + - description: "Footnote about Safety of care measures" + name: "safety_group_footnote" + type: "STRING" + mode: "NULLABLE" + - description: "Count of measures included in the Readmission measure group" + name: "readmission_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Readmission measures used in the hospital’s overall star rating" + name: "facility_readmission_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Readmission measures that are better than the national value" + name: "readmission_measures_better_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Readmission measures that are no different than the national value" + name: "readmission_measures_no_different_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Readmission measures that are worse than the national value" + name: "readmission_measures_worse_count" + type: "STRING" + mode: "NULLABLE" + - description: "Footnote about Readmission measures" + name: "readmission_measures_footnote" + type: "STRING" + mode: "NULLABLE" + - description: "Count of measures included in the Patient experience measure group" + name: "patient_experience_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Patient experience measures used in the hospital’s overall star rating" + name: "facility_patient_experience_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Footnote about Patient experience measures" + name: "patient_experience_measures_footnote" + type: "STRING" + mode: "NULLABLE" + - description: "Count of measures included in the Timely and effective care measure group" + name: "timely_and_effective_care_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Number of Timely and effective care measures used in the hospital’s overall star rating" + name: "facility_timely_and_effective_care_measures_count" + type: "STRING" + mode: "NULLABLE" + - description: "Footnote about Timely and effective care measures" + name: "timely_and_effective_care_measures_footnote" + type: "STRING" + mode: "NULLABLE" + + graph_paths: + - "hospital_info_transform_csv >> load_hospital_info_to_bq" diff --git a/datasets/cms_medicare/inpatient_charges/inpatient_charges_dag.py b/datasets/cms_medicare/inpatient_charges/inpatient_charges_dag.py new file mode 100644 index 000000000..28540f2ba --- /dev/null +++ b/datasets/cms_medicare/inpatient_charges/inpatient_charges_dag.py @@ -0,0 +1,654 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="cms_medicare.inpatient_charges", + default_args=default_args, + max_active_runs=1, + schedule_interval="@once", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + inpatient_2011_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="inpatient_2011_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_inpatient_charges_2011", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2011_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/inpatient_charges_2011/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"}', + "PIPELINE_NAME": "inpatient_charges_2011", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + inpatient_2012_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="inpatient_2012_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_inpatient_charges_2012", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2012_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/inpatient_charges_2012/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"}', + "PIPELINE_NAME": "inpatient_charges_2012", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + inpatient_2013_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="inpatient_2013_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_inpatient_charges_2013", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2013_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/inpatient_charges_2013/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"}', + "PIPELINE_NAME": "inpatient_charges_2013", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + inpatient_2014_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="inpatient_2014_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_inpatient_charges_2014", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2014_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/inpatient_charges_2014/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"}', + "PIPELINE_NAME": "inpatient_charges_2014", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Run CSV transform within kubernetes pod + inpatient_2015_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="inpatient_2015_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_inpatient_charges_2015", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2015_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/inpatient_charges_2015/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"}', + "PIPELINE_NAME": "inpatient_charges_2015", + }, + resources={"limit_memory": "2G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_inpatient_2011_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_inpatient_2011_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/inpatient_charges_2011/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.inpatient_charges_2011", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "drg_definition", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The Hospital Referral Region (HRR) in which the provider is physically located", + "name": "hospital_referral_region_description", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "total_discharges", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_covered_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits", + "name": "average_medicare_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_inpatient_2012_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_inpatient_2012_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/inpatient_charges_2012/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.inpatient_charges_2012", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "drg_definition", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The Hospital Referral Region (HRR) in which the provider is physically located", + "name": "hospital_referral_region_description", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "total_discharges", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_covered_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits", + "name": "average_medicare_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_inpatient_2013_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_inpatient_2013_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/inpatient_charges_2013/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.inpatient_charges_2013", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "drg_definition", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The Hospital Referral Region (HRR) in which the provider is physically located", + "name": "hospital_referral_region_description", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "total_discharges", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_covered_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits", + "name": "average_medicare_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_inpatient_2014_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_inpatient_2014_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/inpatient_charges_2014/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.inpatient_charges_2014", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "drg_definition", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The Hospital Referral Region (HRR) in which the provider is physically located", + "name": "hospital_referral_region_description", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "total_discharges", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_covered_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits", + "name": "average_medicare_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_inpatient_2015_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_inpatient_2015_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/inpatient_charges_2015/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.inpatient_charges_2015", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "drg_definition", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The Hospital Referral Region (HRR) in which the provider is physically located", + "name": "hospital_referral_region_description", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "total_discharges", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_covered_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits", + "name": "average_medicare_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + inpatient_2011_transform_csv >> load_inpatient_2011_to_bq + inpatient_2012_transform_csv >> load_inpatient_2012_to_bq + inpatient_2013_transform_csv >> load_inpatient_2013_to_bq + inpatient_2014_transform_csv >> load_inpatient_2014_to_bq + inpatient_2015_transform_csv >> load_inpatient_2015_to_bq diff --git a/datasets/cms_medicare/inpatient_charges/pipeline.yaml b/datasets/cms_medicare/inpatient_charges/pipeline.yaml new file mode 100644 index 000000000..70a0b9306 --- /dev/null +++ b/datasets/cms_medicare/inpatient_charges/pipeline.yaml @@ -0,0 +1,702 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: inpatient_charges_2011 + + # Description of the table + description: "CMS Medicare Inpatient Charges 2011" + + - type: bigquery_table + # Required Properties: + table_id: inpatient_charges_2012 + + # Description of the table + description: "CMS Medicare Inpatient Charges 2012" + + - type: bigquery_table + # Required Properties: + table_id: inpatient_charges_2013 + + # Description of the table + description: "CMS Medicare Inpatient Charges 2013" + + - type: bigquery_table + # Required Properties: + table_id: inpatient_charges_2014 + + # Description of the table + description: "CMS Medicare Inpatient Charges 2014" + + - type: bigquery_table + # Required Properties: + table_id: inpatient_charges_2015 + + # Description of the table + description: "CMS Medicare Inpatient Charges 2015" + +dag: + + airflow_version: 1 + + initialize: + dag_id: inpatient_charges + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@once" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "inpatient_2011_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_inpatient_charges_2011" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2011_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/inpatient_charges_2011/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"} + PIPELINE_NAME: "inpatient_charges_2011" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "inpatient_2012_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_inpatient_charges_2012" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2012_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/inpatient_charges_2012/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"} + PIPELINE_NAME: "inpatient_charges_2012" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "inpatient_2013_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_inpatient_charges_2013" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2013_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/inpatient_charges_2013/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"} + PIPELINE_NAME: "inpatient_charges_2013" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "inpatient_2014_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_inpatient_charges_2014" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2014_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/inpatient_charges_2014/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"} + PIPELINE_NAME: "inpatient_charges_2014" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "inpatient_2015_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_inpatient_charges_2015" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Inpatient_Data_2015_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/inpatient_charges_2015/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","drg_definition","hospital_referral_region_description","total_discharges","average_covered_charges","average_total_payments","average_medicare_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","DRG Definition": "drg_definition","Hospital Referral Region (HRR) Description": "hospital_referral_region_description","Total Discharges": "total_discharges","Average Covered Charges": "average_covered_charges","Average Total Payments": "average_total_payments","Average Medicare Payments": "average_medicare_payments"} + PIPELINE_NAME: "inpatient_charges_2015" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "2G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_inpatient_2011_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/inpatient_charges_2011/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.inpatient_charges_2011" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "drg_definition" + type: "STRING" + mode: "REQUIRED" + - description: "The Hospital Referral Region (HRR) in which the provider is physically located" + name: "hospital_referral_region_description" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "total_discharges" + type: "INTEGER" + mode: "NULLABLE" + - description: "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_covered_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits" + name: "average_medicare_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_inpatient_2012_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/inpatient_charges_2012/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.inpatient_charges_2012" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "drg_definition" + type: "STRING" + mode: "REQUIRED" + - description: "The Hospital Referral Region (HRR) in which the provider is physically located" + name: "hospital_referral_region_description" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "total_discharges" + type: "INTEGER" + mode: "NULLABLE" + - description: "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_covered_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits" + name: "average_medicare_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_inpatient_2013_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/inpatient_charges_2013/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.inpatient_charges_2013" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "drg_definition" + type: "STRING" + mode: "REQUIRED" + - description: "The Hospital Referral Region (HRR) in which the provider is physically located" + name: "hospital_referral_region_description" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "total_discharges" + type: "INTEGER" + mode: "NULLABLE" + - description: "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_covered_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits" + name: "average_medicare_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_inpatient_2014_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/inpatient_charges_2014/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.inpatient_charges_2014" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "drg_definition" + type: "STRING" + mode: "REQUIRED" + - description: "The Hospital Referral Region (HRR) in which the provider is physically located" + name: "hospital_referral_region_description" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "total_discharges" + type: "INTEGER" + mode: "NULLABLE" + - description: "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_covered_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits" + name: "average_medicare_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_inpatient_2015_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/inpatient_charges_2015/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.inpatient_charges_2015" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "drg_definition" + type: "STRING" + mode: "REQUIRED" + - description: "The Hospital Referral Region (HRR) in which the provider is physically located" + name: "hospital_referral_region_description" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "total_discharges" + type: "INTEGER" + mode: "NULLABLE" + - description: "The provider's average charge for services covered by Medicare for all discharges in the MS-DRG. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_covered_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average total payments to all providers for the MS-DRG including the MSDRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Also included 5 in average total payments are co-payment and deductible amounts that the patient is responsible for and any additional payments by third parties for coordination of benefits" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + - description: "The average amount that Medicare pays to the provider for Medicare's share of the MS-DRG. Average Medicare payment amounts include the MS-DRG amount, teaching, disproportionate share, capital, and outlier payments for all cases. Medicare payments DO NOT include beneficiary co-payments and deductible amounts nor any additional payments from third parties for coordination of benefits" + name: "average_medicare_payments" + type: "FLOAT" + mode: "NULLABLE" + + graph_paths: + - "inpatient_2011_transform_csv >> load_inpatient_2011_to_bq" + - "inpatient_2012_transform_csv >> load_inpatient_2012_to_bq" + - "inpatient_2013_transform_csv >> load_inpatient_2013_to_bq" + - "inpatient_2014_transform_csv >> load_inpatient_2014_to_bq" + - "inpatient_2015_transform_csv >> load_inpatient_2015_to_bq" diff --git a/datasets/cms_medicare/outpatient_charges/outpatient_charges_dag.py b/datasets/cms_medicare/outpatient_charges/outpatient_charges_dag.py new file mode 100644 index 000000000..29ad3d73c --- /dev/null +++ b/datasets/cms_medicare/outpatient_charges/outpatient_charges_dag.py @@ -0,0 +1,503 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="cms_medicare.outpatient_charges", + default_args=default_args, + max_active_runs=1, + schedule_interval="@once", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + outpatient_2011_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="outpatient_2011_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_outpatient_charges_2011", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2011_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2011/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}', + "PIPELINE_NAME": "outpatient_charges_2011", + }, + ) + + # Run CSV transform within kubernetes pod + outpatient_2012_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="outpatient_2012_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_outpatient_charges_2012", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2012_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2012/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}', + "PIPELINE_NAME": "outpatient_charges_2012", + }, + ) + + # Run CSV transform within kubernetes pod + outpatient_2013_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="outpatient_2013_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_outpatient_charges_2013", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2013_CSV_v2.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2013/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]', + "RENAME_MAPPINGS": '{"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"}', + "PIPELINE_NAME": "outpatient_charges_2013", + }, + ) + + # Run CSV transform within kubernetes pod + outpatient_2014_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="outpatient_2014_transform_csv", + startup_timeout_seconds=600, + name="cms_medicare_outpatient_charges_2014", + namespace="default", + affinity={ + "nodeAffinity": { + "requiredDuringSchedulingIgnoredDuringExecution": { + "nodeSelectorTerms": [ + { + "matchExpressions": [ + { + "key": "cloud.google.com/gke-nodepool", + "operator": "In", + "values": ["pool-e2-standard-4"], + } + ] + } + ] + } + } + }, + image_pull_policy="Always", + image="{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2014_CSV.zip", + "SOURCE_FILE": "files/data.zip", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.value.composer_bucket }}", + "TARGET_GCS_PATH": "data/cms_medicare/outpatient_charges_2014/data_output.csv", + "CSV_HEADERS": '["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"]', + "RENAME_MAPPINGS": '{"provider_id": "provider_id","provider_name": "provider_name","Provider_Street_Address": "provider_street_address","Provider_City": "provider_city","Provider_State": "provider_state","Provider_Zip_Code": "provider_zipcode","apc": "apc","Hospital_Referral_Region": "hospital_referral_region","Outpatient_Services": "outpatient_services","Average_Estimated_Submitted_Charges": "average_estimated_submitted_charges","Average_Total_Payments": "average_total_payments"}', + "PIPELINE_NAME": "outpatient_charges_2014", + }, + resources={"limit_memory": "4G", "limit_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_outpatient_2011_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_outpatient_2011_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/outpatient_charges_2011/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.outpatient_charges_2011", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "apc", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs", + "name": "hospital_referral_region", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "outpatient_services", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The number of services billed by the provider for outpatient hospital services", + "name": "average_estimated_submitted_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_outpatient_2012_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_outpatient_2012_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/outpatient_charges_2012/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.outpatient_charges_2012", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "apc", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs", + "name": "hospital_referral_region", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "outpatient_services", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The number of services billed by the provider for outpatient hospital services", + "name": "average_estimated_submitted_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_outpatient_2013_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_outpatient_2013_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/outpatient_charges_2013/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.outpatient_charges_2013", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "apc", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs", + "name": "hospital_referral_region", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "outpatient_services", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The number of services billed by the provider for outpatient hospital services", + "name": "average_estimated_submitted_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + # Task to load CSV data to a BigQuery table + load_outpatient_2014_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_outpatient_2014_to_bq", + bucket="{{ var.value.composer_bucket }}", + source_objects=["data/cms_medicare/outpatient_charges_2014/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="cms_medicare.outpatient_charges_2014", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "description": "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services", + "name": "provider_id", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "The name of the provider", + "name": "provider_name", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The street address in which the provider is physically located", + "name": "provider_street_address", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The city in which the provider is physically located", + "name": "provider_city", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The state in which the provider is physically located", + "name": "provider_state", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The zip code in which the provider is physically located", + "name": "provider_zipcode", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay", + "name": "apc", + "type": "STRING", + "mode": "REQUIRED", + }, + { + "description": "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs", + "name": "hospital_referral_region", + "type": "STRING", + "mode": "NULLABLE", + }, + { + "description": "The number of discharges billed by the provider for inpatient hospital services", + "name": "outpatient_services", + "type": "INTEGER", + "mode": "NULLABLE", + }, + { + "description": "The number of services billed by the provider for outpatient hospital services", + "name": "average_estimated_submitted_charges", + "type": "FLOAT", + "mode": "NULLABLE", + }, + { + "description": "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures", + "name": "average_total_payments", + "type": "FLOAT", + "mode": "NULLABLE", + }, + ], + ) + + outpatient_2011_transform_csv >> load_outpatient_2011_to_bq + outpatient_2012_transform_csv >> load_outpatient_2012_to_bq + outpatient_2013_transform_csv >> load_outpatient_2013_to_bq + outpatient_2014_transform_csv >> load_outpatient_2014_to_bq diff --git a/datasets/cms_medicare/outpatient_charges/pipeline.yaml b/datasets/cms_medicare/outpatient_charges/pipeline.yaml new file mode 100644 index 000000000..97dffdf37 --- /dev/null +++ b/datasets/cms_medicare/outpatient_charges/pipeline.yaml @@ -0,0 +1,538 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: outpatient_charges_2011 + + # Description of the table + description: "CMS Medicare Outpatient Charges 2011" + + - type: bigquery_table + # Required Properties: + table_id: outpatient_charges_2012 + + # Description of the table + description: "CMS Medicare Outpatient Charges 2012" + + - type: bigquery_table + # Required Properties: + table_id: outpatient_charges_2013 + + # Description of the table + description: "CMS Medicare Outpatient Charges 2013" + + - type: bigquery_table + # Required Properties: + table_id: outpatient_charges_2014 + + # Description of the table + description: "CMS Medicare Outpatient Charges 2014" + +dag: + + airflow_version: 1 + + initialize: + dag_id: outpatient_charges + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@once" + catchup: False + default_view: graph + + tasks: + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "outpatient_2011_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_outpatient_charges_2011" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2011_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/outpatient_charges_2011/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"} + PIPELINE_NAME: "outpatient_charges_2011" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "outpatient_2012_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_outpatient_charges_2012" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2012_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/outpatient_charges_2012/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"} + PIPELINE_NAME: "outpatient_charges_2012" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "outpatient_2013_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_outpatient_charges_2013" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2013_CSV_v2.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/outpatient_charges_2013/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"] + RENAME_MAPPINGS: >- + {"Provider Id": "provider_id","Provider Name": "provider_name","Provider Street Address": "provider_street_address","Provider City": "provider_city","Provider State": "provider_state","Provider Zip Code": "provider_zipcode","APC": "apc","Hospital Referral Region (HRR) Description": "hospital_referral_region","Outpatient Services": "outpatient_services","Average Estimated Submitted Charges": "average_estimated_submitted_charges","Average Total Payments": "average_total_payments"} + PIPELINE_NAME: "outpatient_charges_2013" + + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "outpatient_2014_transform_csv" + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "cms_medicare_outpatient_charges_2014" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + affinity: + nodeAffinity: + requiredDuringSchedulingIgnoredDuringExecution: + nodeSelectorTerms: + - matchExpressions: + - key: cloud.google.com/gke-nodepool + operator: In + values: + - "pool-e2-standard-4" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.cms_medicare.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://www.cms.gov/Research-Statistics-Data-and-Systems/Statistics-Trends-and-Reports/Medicare-Provider-Charge-Data/Downloads/Outpatient_Data_2014_CSV.zip" + SOURCE_FILE: "files/data.zip" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.value.composer_bucket }}" + TARGET_GCS_PATH: "data/cms_medicare/outpatient_charges_2014/data_output.csv" + CSV_HEADERS: >- + ["provider_id","provider_name","provider_street_address","provider_city","provider_state","provider_zipcode","apc","hospital_referral_region","outpatient_services","average_estimated_submitted_charges","average_total_payments"] + RENAME_MAPPINGS: >- + {"provider_id": "provider_id","provider_name": "provider_name","Provider_Street_Address": "provider_street_address","Provider_City": "provider_city","Provider_State": "provider_state","Provider_Zip_Code": "provider_zipcode","apc": "apc","Hospital_Referral_Region": "hospital_referral_region","Outpatient_Services": "outpatient_services","Average_Estimated_Submitted_Charges": "average_estimated_submitted_charges","Average_Total_Payments": "average_total_payments"} + PIPELINE_NAME: "outpatient_charges_2014" + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + limit_memory: "4G" + limit_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_outpatient_2011_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/outpatient_charges_2011/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.outpatient_charges_2011" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "apc" + type: "STRING" + mode: "REQUIRED" + - description: "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs" + name: "hospital_referral_region" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "outpatient_services" + type: "INTEGER" + mode: "NULLABLE" + - description: "The number of services billed by the provider for outpatient hospital services" + name: "average_estimated_submitted_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_outpatient_2012_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/outpatient_charges_2012/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.outpatient_charges_2012" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "apc" + type: "STRING" + mode: "REQUIRED" + - description: "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs" + name: "hospital_referral_region" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "outpatient_services" + type: "INTEGER" + mode: "NULLABLE" + - description: "The number of services billed by the provider for outpatient hospital services" + name: "average_estimated_submitted_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_outpatient_2013_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/outpatient_charges_2013/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.outpatient_charges_2013" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "apc" + type: "STRING" + mode: "REQUIRED" + - description: "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs" + name: "hospital_referral_region" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "outpatient_services" + type: "INTEGER" + mode: "NULLABLE" + - description: "The number of services billed by the provider for outpatient hospital services" + name: "average_estimated_submitted_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_outpatient_2014_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.value.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/cms_medicare/outpatient_charges_2014/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "cms_medicare.outpatient_charges_2014" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + # types: "INTEGER", "TIMESTAMP", "STRING" + schema_fields: + - description: "The CMS Certification Number (CCN) of the provider billing for outpatient hospital services" + name: "provider_id" + type: "STRING" + mode: "REQUIRED" + - description: "The name of the provider" + name: "provider_name" + type: "STRING" + mode: "NULLABLE" + - description: "The street address in which the provider is physically located" + name: "provider_street_address" + type: "STRING" + mode: "NULLABLE" + - description: "The city in which the provider is physically located" + name: "provider_city" + type: "STRING" + mode: "NULLABLE" + - description: "The state in which the provider is physically located" + name: "provider_state" + type: "STRING" + mode: "NULLABLE" + - description: "The zip code in which the provider is physically located" + name: "provider_zipcode" + type: "INTEGER" + mode: "NULLABLE" + - description: "The code and description identifying the MS-DRG. MS-DRGs are a classification system that groups similar clinical conditions (diagnoses) and the procedures furnished by the hospital during the stay" + name: "apc" + type: "STRING" + mode: "REQUIRED" + - description: "Code and description identifying the APC. APCs are a classification system where individual services (Healthcare Common Procedure Coding System [HCPCS] codes) are assigned based on similar clinical characteristics and similar costs" + name: "hospital_referral_region" + type: "STRING" + mode: "NULLABLE" + - description: "The number of discharges billed by the provider for inpatient hospital services" + name: "outpatient_services" + type: "INTEGER" + mode: "NULLABLE" + - description: "The number of services billed by the provider for outpatient hospital services" + name: "average_estimated_submitted_charges" + type: "FLOAT" + mode: "NULLABLE" + - description: "The provider's average estimated submitted charge for services covered by Medicare for the APC. These will vary from hospital to hospital because of differences in hospital charge structures" + name: "average_total_payments" + type: "FLOAT" + mode: "NULLABLE" + + graph_paths: + - "outpatient_2011_transform_csv >> load_outpatient_2011_to_bq" + - "outpatient_2012_transform_csv >> load_outpatient_2012_to_bq" + - "outpatient_2013_transform_csv >> load_outpatient_2013_to_bq" + - "outpatient_2014_transform_csv >> load_outpatient_2014_to_bq"