diff --git a/datasets/google_political_ads/_images/run_csv_transform_kub/Dockerfile b/datasets/google_political_ads/_images/run_csv_transform_kub/Dockerfile new file mode 100644 index 000000000..85af90570 --- /dev/null +++ b/datasets/google_political_ads/_images/run_csv_transform_kub/Dockerfile @@ -0,0 +1,38 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# The base image for this build +# FROM gcr.io/google.com/cloudsdktool/cloud-sdk:slim +FROM python:3.8 + +# Allow statements and log messages to appear in Cloud logs +ENV PYTHONUNBUFFERED True + +# Copy the requirements file into the image +COPY requirements.txt ./ + +# Install the packages specified in the requirements file +RUN python3 -m pip install --no-cache-dir -r requirements.txt + +# The WORKDIR instruction sets the working directory for any RUN, CMD, +# ENTRYPOINT, COPY and ADD instructions that follow it in the Dockerfile. +# If the WORKDIR doesn’t exist, it will be created even if it’s not used in +# any subsequent Dockerfile instruction +WORKDIR /custom + +# Copy the specific data processing script/s in the image under /custom/* +COPY ./csv_transform.py . + +# Command to run the data processing script when the container is run +CMD ["python3", "csv_transform.py"] diff --git a/datasets/google_political_ads/_images/run_csv_transform_kub/Pipfile b/datasets/google_political_ads/_images/run_csv_transform_kub/Pipfile new file mode 100644 index 000000000..37f9797d3 --- /dev/null +++ b/datasets/google_political_ads/_images/run_csv_transform_kub/Pipfile @@ -0,0 +1,13 @@ +[[source]] +url = "https://pypi.org/simple" +verify_ssl = true +name = "pypi" + +[packages] +requests = "*" +vaex = "*" + +[dev-packages] + +[requires] +python_version = "3.9" diff --git a/datasets/google_political_ads/_images/run_csv_transform_kub/csv_transform.py b/datasets/google_political_ads/_images/run_csv_transform_kub/csv_transform.py new file mode 100644 index 000000000..d66a13317 --- /dev/null +++ b/datasets/google_political_ads/_images/run_csv_transform_kub/csv_transform.py @@ -0,0 +1,157 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import datetime +import fnmatch +import json +import logging +import math +import os +import pathlib +import typing +from zipfile import ZipFile + +import pandas as pd +import requests +from google.cloud import storage + + +def main( + source_url: str, + source_file: pathlib.Path, + source_csv_name: str, + target_file: pathlib.Path, + target_gcs_bucket: str, + target_gcs_path: str, + headers: typing.List[str], + rename_mappings: dict, + pipeline_name: str, +) -> None: + + logging.info( + f"google political ads {pipeline_name} process started at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + logging.info("creating 'files' folder") + pathlib.Path("./files").mkdir(parents=True, exist_ok=True) + + logging.info(f"Downloading file {source_url}") + download_file(source_url, source_file) + + logging.info(f"Opening file {source_file}") + df = read_csv_file(source_file, source_csv_name) + + logging.info(f"Transforming.. {source_file}") + + logging.info(f"Transform: Rename columns for {pipeline_name}..") + rename_headers(df, rename_mappings) + + if pipeline_name == "creative_stats": + logging.info(f"Transform: converting to integer for {pipeline_name}..") + df["spend_range_max_usd"] = df["spend_range_max_usd"].apply(convert_to_int) + df["spend_range_max_eur"] = df["spend_range_max_eur"].apply(convert_to_int) + df["spend_range_max_inr"] = df["spend_range_max_inr"].apply(convert_to_int) + df["spend_range_max_bgn"] = df["spend_range_max_bgn"].apply(convert_to_int) + df["spend_range_max_hrk"] = df["spend_range_max_hrk"].apply(convert_to_int) + df["spend_range_max_czk"] = df["spend_range_max_czk"].apply(convert_to_int) + df["spend_range_max_dkk"] = df["spend_range_max_dkk"].apply(convert_to_int) + df["spend_range_max_huf"] = df["spend_range_max_huf"].apply(convert_to_int) + df["spend_range_max_pln"] = df["spend_range_max_pln"].apply(convert_to_int) + df["spend_range_max_ron"] = df["spend_range_max_ron"].apply(convert_to_int) + df["spend_range_max_gbp"] = df["spend_range_max_gbp"].apply(convert_to_int) + df["spend_range_max_sek"] = df["spend_range_max_sek"].apply(convert_to_int) + df["spend_range_max_nzd"] = df["spend_range_max_nzd"].apply(convert_to_int) + else: + df = df + + logging.info(f"Transform: Reordering headers for {pipeline_name}.. ") + df = df[headers] + + logging.info(f"Saving to output file.. {target_file}") + try: + save_to_new_file(df, file_path=str(target_file)) + except Exception as e: + logging.error(f"Error saving output file: {e}.") + + logging.info( + f"Uploading output file to.. gs://{target_gcs_bucket}/{target_gcs_path}" + ) + upload_file_to_gcs(target_file, target_gcs_bucket, target_gcs_path) + + logging.info( + f"Google Political Ads {pipeline_name} process completed at " + + str(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")) + ) + + +def save_to_new_file(df: pd.DataFrame, file_path: str) -> None: + df.to_csv(file_path, index=False) + + +def upload_file_to_gcs(file_path: pathlib.Path, gcs_bucket: str, gcs_path: str) -> None: + storage_client = storage.Client() + bucket = storage_client.bucket(gcs_bucket) + blob = bucket.blob(gcs_path) + blob.upload_from_filename(file_path) + + +def download_file(source_url: str, source_file: pathlib.Path) -> None: + logging.info(f"Downloading {source_url} into {source_file}") + r = requests.get(source_url, stream=True) + if r.status_code == 200: + with open(source_file, "wb") as f: + for chunk in r: + f.write(chunk) + else: + logging.error(f"Couldn't download {source_url}: {r.text}") + + +def read_csv_file(source_file: pathlib.Path, source_csv_name: str) -> pd.DataFrame: + with ZipFile(source_file) as zipfiles: + file_list = zipfiles.namelist() + csv_files = fnmatch.filter(file_list, source_csv_name) + data = [pd.read_csv(zipfiles.open(file_name)) for file_name in csv_files] + df = pd.concat(data) + return df + + +def rename_headers(df: pd.DataFrame, rename_mappings: dict) -> None: + df.rename(columns=rename_mappings, inplace=True) + + +def convert_to_int(input: str) -> str: + str_val = "" + if input == "" or (math.isnan(input)): + str_val = "" + else: + str_val = str(int(round(input, 0))) + return str_val + + +if __name__ == "__main__": + logging.getLogger().setLevel(logging.INFO) + + main( + source_url=os.environ["SOURCE_URL"], + source_file=pathlib.Path(os.environ["SOURCE_FILE"]).expanduser(), + source_csv_name=os.environ["FILE_NAME"], + target_file=pathlib.Path(os.environ["TARGET_FILE"]).expanduser(), + target_gcs_bucket=os.environ["TARGET_GCS_BUCKET"], + target_gcs_path=os.environ["TARGET_GCS_PATH"], + headers=json.loads(os.environ["CSV_HEADERS"]), + rename_mappings=json.loads(os.environ["RENAME_MAPPINGS"]), + pipeline_name=os.environ["PIPELINE_NAME"], + ) diff --git a/datasets/google_political_ads/_images/run_csv_transform_kub/requirements.txt b/datasets/google_political_ads/_images/run_csv_transform_kub/requirements.txt new file mode 100644 index 000000000..1c45cdfc3 --- /dev/null +++ b/datasets/google_political_ads/_images/run_csv_transform_kub/requirements.txt @@ -0,0 +1,3 @@ +requests +google-cloud-storage +pandas diff --git a/datasets/google_political_ads/_terraform/advertiser_declared_stats_pipeline.tf b/datasets/google_political_ads/_terraform/advertiser_declared_stats_pipeline.tf new file mode 100644 index 000000000..3461293df --- /dev/null +++ b/datasets/google_political_ads/_terraform/advertiser_declared_stats_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "advertiser_declared_stats" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "advertiser_declared_stats" + + description = "advertiser_declared_stats dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-advertiser_declared_stats-table_id" { + value = google_bigquery_table.advertiser_declared_stats.table_id +} + +output "bigquery_table-advertiser_declared_stats-id" { + value = google_bigquery_table.advertiser_declared_stats.id +} diff --git a/datasets/google_political_ads/_terraform/advertiser_stats_pipeline.tf b/datasets/google_political_ads/_terraform/advertiser_stats_pipeline.tf new file mode 100644 index 000000000..c008e8444 --- /dev/null +++ b/datasets/google_political_ads/_terraform/advertiser_stats_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "advertiser_stats" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "advertiser_stats" + + description = "advertiser_stats dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-advertiser_stats-table_id" { + value = google_bigquery_table.advertiser_stats.table_id +} + +output "bigquery_table-advertiser_stats-id" { + value = google_bigquery_table.advertiser_stats.id +} diff --git a/datasets/google_political_ads/_terraform/advertiser_weekly_spend_pipeline.tf b/datasets/google_political_ads/_terraform/advertiser_weekly_spend_pipeline.tf new file mode 100644 index 000000000..75a79d7e4 --- /dev/null +++ b/datasets/google_political_ads/_terraform/advertiser_weekly_spend_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "advertiser_weekly_spend" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "advertiser_weekly_spend" + + description = "advertiser_weekly_spend dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-advertiser_weekly_spend-table_id" { + value = google_bigquery_table.advertiser_weekly_spend.table_id +} + +output "bigquery_table-advertiser_weekly_spend-id" { + value = google_bigquery_table.advertiser_weekly_spend.id +} diff --git a/datasets/google_political_ads/_terraform/campaign_targeting_pipeline.tf b/datasets/google_political_ads/_terraform/campaign_targeting_pipeline.tf new file mode 100644 index 000000000..737460099 --- /dev/null +++ b/datasets/google_political_ads/_terraform/campaign_targeting_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "campaign_targeting" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "campaign_targeting" + + description = "campaign_targeting dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-campaign_targeting-table_id" { + value = google_bigquery_table.campaign_targeting.table_id +} + +output "bigquery_table-campaign_targeting-id" { + value = google_bigquery_table.campaign_targeting.id +} diff --git a/datasets/google_political_ads/_terraform/creative_stats_pipeline.tf b/datasets/google_political_ads/_terraform/creative_stats_pipeline.tf new file mode 100644 index 000000000..9b44ecf8a --- /dev/null +++ b/datasets/google_political_ads/_terraform/creative_stats_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "creative_stats" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "creative_stats" + + description = "creative_stats dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-creative_stats-table_id" { + value = google_bigquery_table.creative_stats.table_id +} + +output "bigquery_table-creative_stats-id" { + value = google_bigquery_table.creative_stats.id +} diff --git a/datasets/google_political_ads/_terraform/geo_spend_pipeline.tf b/datasets/google_political_ads/_terraform/geo_spend_pipeline.tf new file mode 100644 index 000000000..05e113c77 --- /dev/null +++ b/datasets/google_political_ads/_terraform/geo_spend_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "geo_spend" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "geo_spend" + + description = "geo_spend dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-geo_spend-table_id" { + value = google_bigquery_table.geo_spend.table_id +} + +output "bigquery_table-geo_spend-id" { + value = google_bigquery_table.geo_spend.id +} diff --git a/datasets/google_political_ads/_terraform/google_political_ads_dataset.tf b/datasets/google_political_ads/_terraform/google_political_ads_dataset.tf new file mode 100644 index 000000000..edac812af --- /dev/null +++ b/datasets/google_political_ads/_terraform/google_political_ads_dataset.tf @@ -0,0 +1,26 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_dataset" "google_political_ads" { + dataset_id = "google_political_ads" + project = var.project_id + description = "google_political_ads" +} + +output "bigquery_dataset-google_political_ads-dataset_id" { + value = google_bigquery_dataset.google_political_ads.dataset_id +} diff --git a/datasets/google_political_ads/_terraform/last_updated_pipeline.tf b/datasets/google_political_ads/_terraform/last_updated_pipeline.tf new file mode 100644 index 000000000..14c6eb3e4 --- /dev/null +++ b/datasets/google_political_ads/_terraform/last_updated_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "last_updated" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "last_updated" + + description = "last_updated dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-last_updated-table_id" { + value = google_bigquery_table.last_updated.table_id +} + +output "bigquery_table-last_updated-id" { + value = google_bigquery_table.last_updated.id +} diff --git a/datasets/google_political_ads/_terraform/provider.tf b/datasets/google_political_ads/_terraform/provider.tf new file mode 100644 index 000000000..23ab87dcd --- /dev/null +++ b/datasets/google_political_ads/_terraform/provider.tf @@ -0,0 +1,28 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +provider "google" { + project = var.project_id + impersonate_service_account = var.impersonating_acct + region = var.region +} + +data "google_client_openid_userinfo" "me" {} + +output "impersonating-account" { + value = data.google_client_openid_userinfo.me.email +} diff --git a/datasets/google_political_ads/_terraform/top_keywords_history_pipeline.tf b/datasets/google_political_ads/_terraform/top_keywords_history_pipeline.tf new file mode 100644 index 000000000..c6c24192f --- /dev/null +++ b/datasets/google_political_ads/_terraform/top_keywords_history_pipeline.tf @@ -0,0 +1,39 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +resource "google_bigquery_table" "top_keywords_history" { + project = var.project_id + dataset_id = "google_political_ads" + table_id = "top_keywords_history" + + description = "top_keywords_history dataset" + + + + + depends_on = [ + google_bigquery_dataset.google_political_ads + ] +} + +output "bigquery_table-top_keywords_history-table_id" { + value = google_bigquery_table.top_keywords_history.table_id +} + +output "bigquery_table-top_keywords_history-id" { + value = google_bigquery_table.top_keywords_history.id +} diff --git a/datasets/google_political_ads/_terraform/variables.tf b/datasets/google_political_ads/_terraform/variables.tf new file mode 100644 index 000000000..c3ec7c506 --- /dev/null +++ b/datasets/google_political_ads/_terraform/variables.tf @@ -0,0 +1,23 @@ +/** + * Copyright 2021 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +variable "project_id" {} +variable "bucket_name_prefix" {} +variable "impersonating_acct" {} +variable "region" {} +variable "env" {} + diff --git a/datasets/google_political_ads/advertiser_declared_stats/advertiser_declared_stats_dag.py b/datasets/google_political_ads/advertiser_declared_stats/advertiser_declared_stats_dag.py new file mode 100644 index 000000000..ba00eaefe --- /dev/null +++ b/datasets/google_political_ads/advertiser_declared_stats/advertiser_declared_stats_dag.py @@ -0,0 +1,108 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.advertiser_declared_stats", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + advertiser_declared_stats_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="advertiser_declared_stats_transform_csv", + startup_timeout_seconds=600, + name="advertiser_declared_stats", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/*advertiser-declared-stats*", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/advertiser_declared_stats/data_output.csv", + "PIPELINE_NAME": "advertiser_declared_stats", + "CSV_HEADERS": '["advertiser_id","advertiser_declared_name","advertiser_declared_regulatory_id","advertiser_declared_scope","advertiser_declared_promoter_name","advertiser_declared_promoter_address"]', + "RENAME_MAPPINGS": '{"Advertiser_ID" : "advertiser_id","Advertiser_Declared_Name" : "advertiser_declared_name","Advertiser_Declared_Regulatory_ID" : "advertiser_declared_regulatory_id","Advertiser_Declared_Scope" : "advertiser_declared_scope","Advertiser_Declared_Promoter_Name" : "advertiser_declared_promoter_name","Advertiser_Declared_Promoter_Address" : "advertiser_declared_promoter_address"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_advertiser_declared_stats_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_advertiser_declared_stats_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/google_political_ads/advertiser_declared_stats/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="google_political_ads.advertiser_declared_stats", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "advertiser_id", + "type": "string", + "description": "ID of the advertiser who purchased the ad.", + "mode": "nullable", + }, + { + "name": "advertiser_declared_name", + "type": "string", + "description": "The advertiser’s committee declared name.", + "mode": "nullable", + }, + { + "name": "advertiser_declared_regulatory_id", + "type": "string", + "description": "Committee declared identification number.", + "mode": "nullable", + }, + { + "name": "advertiser_declared_scope", + "type": "string", + "description": "Committee-provided information about the candidate and office or ballot proposition and jurisdiction to which the advertisement refers which is separate from our verification process.", + "mode": "nullable", + }, + { + "name": "advertiser_declared_promoter_name", + "type": "string", + "description": "The New Zealand advertiser’s declared Promoter Statement name.", + "mode": "nullable", + }, + { + "name": "advertiser_declared_promoter_address", + "type": "string", + "description": "The New Zealand advertiser’s declared Promoter Statement address.", + "mode": "nullable", + }, + ], + ) + + advertiser_declared_stats_transform_csv >> load_advertiser_declared_stats_to_bq diff --git a/datasets/google_political_ads/advertiser_declared_stats/pipeline.yaml b/datasets/google_political_ads/advertiser_declared_stats/pipeline.yaml new file mode 100644 index 000000000..8b64560eb --- /dev/null +++ b/datasets/google_political_ads/advertiser_declared_stats/pipeline.yaml @@ -0,0 +1,137 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: advertiser_declared_stats + + # Description of the table + description: "Certain California and New Zealand advertisers are required to submit additional data about themselves. The advertiser is responsible for the accuracy of this information, which Google has not confirmed. For California, this information is provided from our express notification process required for certain California advertisers, which is separate from our verification process. For New Zealand, this information is provided during our verification process." + +dag: + airflow_version: 1 + initialize: + dag_id: advertiser_declared_stats + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: "2021-03-01" + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "advertiser_declared_stats_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "advertiser_declared_stats" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment"s resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/*advertiser-declared-stats*" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/advertiser_declared_stats/data_output.csv" + PIPELINE_NAME: "advertiser_declared_stats" + CSV_HEADERS: >- + ["advertiser_id","advertiser_declared_name","advertiser_declared_regulatory_id","advertiser_declared_scope","advertiser_declared_promoter_name","advertiser_declared_promoter_address"] + RENAME_MAPPINGS: >- + {"Advertiser_ID" : "advertiser_id","Advertiser_Declared_Name" : "advertiser_declared_name","Advertiser_Declared_Regulatory_ID" : "advertiser_declared_regulatory_id","Advertiser_Declared_Scope" : "advertiser_declared_scope","Advertiser_Declared_Promoter_Name" : "advertiser_declared_promoter_name","Advertiser_Declared_Promoter_Address" : "advertiser_declared_promoter_address"} + + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_advertiser_declared_stats_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/advertiser_declared_stats/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.advertiser_declared_stats" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "advertiser_id" + type: "string" + description: "ID of the advertiser who purchased the ad." + mode: "nullable" + - name: "advertiser_declared_name" + type: "string" + description: "The advertiser’s committee declared name." + mode: "nullable" + - name: "advertiser_declared_regulatory_id" + type: "string" + description: "Committee declared identification number." + mode: "nullable" + - name: "advertiser_declared_scope" + type: "string" + description: "Committee-provided information about the candidate and office or ballot proposition and jurisdiction to which the advertisement refers which is separate from our verification process." + mode: "nullable" + - name: "advertiser_declared_promoter_name" + type: "string" + description: "The New Zealand advertiser’s declared Promoter Statement name." + mode: "nullable" + - name: "advertiser_declared_promoter_address" + type: "string" + description: "The New Zealand advertiser’s declared Promoter Statement address." + mode: "nullable" + + + graph_paths: + - "advertiser_declared_stats_transform_csv >> load_advertiser_declared_stats_to_bq" diff --git a/datasets/google_political_ads/advertiser_stats/advertiser_stats_dag.py b/datasets/google_political_ads/advertiser_stats/advertiser_stats_dag.py new file mode 100644 index 000000000..ef7919e52 --- /dev/null +++ b/datasets/google_political_ads/advertiser_stats/advertiser_stats_dag.py @@ -0,0 +1,184 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.advertiser_stats", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + advertiser_stats_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="advertiser_stats_transform_csv", + startup_timeout_seconds=600, + name="advertiser_stats", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-advertiser-stats.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/advertiser_stats/data_output.csv", + "PIPELINE_NAME": "advertiser_stats", + "CSV_HEADERS": '["advertiser_id","advertiser_name","public_ids_list","regions","elections","total_creatives","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"]', + "RENAME_MAPPINGS": '{"Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Public_IDs_List": "public_ids_list","Regions": "regions","Elections": "elections","Total_Creatives": "total_creatives","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_advertiser_stats_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_advertiser_stats_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/google_political_ads/advertiser_stats/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="google_political_ads.advertiser_stats", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "advertiser_id", + "type": "string", + "description": "Unique ID for an advertiser verified to run election ads on Google Ads Services.", + "mode": "nullable", + }, + { + "name": "advertiser_name", + "type": "string", + "description": "Name of advertiser.", + "mode": "nullable", + }, + { + "name": "public_ids_list", + "type": "string", + "description": "List of public IDs used to identify the advertiser if available.", + "mode": "nullable", + }, + { + "name": "regions", + "type": "string", + "description": "The list of regions where the ads of this advertiser were served", + "mode": "nullable", + }, + { + "name": "elections", + "type": "string", + "description": "The list of elections that this advertiser participated in based on the regions.", + "mode": "nullable", + }, + { + "name": "total_creatives", + "type": "integer", + "description": "Total number of election ads the advertiser ran with at least one impression.", + "mode": "nullable", + }, + { + "name": "spend_usd", + "type": "integer", + "description": "Total amount in USD spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_eur", + "type": "integer", + "description": "Total amount in EUR spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_inr", + "type": "integer", + "description": "Total amount in INR spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_bgn", + "type": "integer", + "description": "Total amount in BGN spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_hrk", + "type": "integer", + "description": "Total amount in HRK spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_czk", + "type": "integer", + "description": "Total amount in CZK spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_dkk", + "type": "integer", + "description": "Total amount in DKK spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_huf", + "type": "integer", + "description": "Total amount in HUF spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_pln", + "type": "integer", + "description": "Total amount in PLN spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_ron", + "type": "integer", + "description": "Total amount in RON spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_sek", + "type": "integer", + "description": "Total amount in SEK spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_gbp", + "type": "integer", + "description": "Total amount in GBP spent on election ads by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_nzd", + "type": "integer", + "description": "Total amount in NZD spent on election ads by the advertiser.", + "mode": "nullable", + }, + ], + ) + + advertiser_stats_transform_csv >> load_advertiser_stats_to_bq diff --git a/datasets/google_political_ads/advertiser_stats/pipeline.yaml b/datasets/google_political_ads/advertiser_stats/pipeline.yaml new file mode 100644 index 000000000..1fa32ebeb --- /dev/null +++ b/datasets/google_political_ads/advertiser_stats/pipeline.yaml @@ -0,0 +1,189 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: advertiser_stats + + # Description of the table + description: "This table contains the information about advertisers who have run an election ad on Google Ads Services with at least one impression. The table's primary key is advertiser_id. This table relates to the others in this dataset, with the following connections between columns: advertiser_id is referenced from: advertiser_weekly_spend.advertiser_id campaign_targeting.advertiser_id creative_stats.advertiser_id advertiser_name is referenced from: advertiser_weekly_spend.advertiser_name campaign_targeting.advertiser_name advertiser_id.advertiser_name" + +dag: + airflow_version: 1 + initialize: + dag_id: advertiser_stats + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "advertiser_stats_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "advertiser_stats" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-advertiser-stats.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/advertiser_stats/data_output.csv" + PIPELINE_NAME: "advertiser_stats" + CSV_HEADERS: >- + ["advertiser_id","advertiser_name","public_ids_list","regions","elections","total_creatives","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"] + RENAME_MAPPINGS: >- + {"Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Public_IDs_List": "public_ids_list","Regions": "regions","Elections": "elections","Total_Creatives": "total_creatives","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"} + + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_advertiser_stats_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/advertiser_stats/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.advertiser_stats" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "advertiser_id" + type: "string" + description: "Unique ID for an advertiser verified to run election ads on Google Ads Services." + mode: "nullable" + - name: "advertiser_name" + type: "string" + description: "Name of advertiser." + mode: "nullable" + - name: "public_ids_list" + type: "string" + description: "List of public IDs used to identify the advertiser if available." + mode: "nullable" + - name: "regions" + type: "string" + description: "The list of regions where the ads of this advertiser were served" + mode: "nullable" + - name: "elections" + type: "string" + description: "The list of elections that this advertiser participated in based on the regions." + mode: "nullable" + - name: "total_creatives" + type: "integer" + description: "Total number of election ads the advertiser ran with at least one impression." + mode: "nullable" + - name: "spend_usd" + type: "integer" + description: "Total amount in USD spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_eur" + type: "integer" + description: "Total amount in EUR spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_inr" + type: "integer" + description: "Total amount in INR spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_bgn" + type: "integer" + description: "Total amount in BGN spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_hrk" + type: "integer" + description: "Total amount in HRK spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_czk" + type: "integer" + description: "Total amount in CZK spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_dkk" + type: "integer" + description: "Total amount in DKK spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_huf" + type: "integer" + description: "Total amount in HUF spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_pln" + type: "integer" + description: "Total amount in PLN spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_ron" + type: "integer" + description: "Total amount in RON spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_sek" + type: "integer" + description: "Total amount in SEK spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_gbp" + type: "integer" + description: "Total amount in GBP spent on election ads by the advertiser." + mode: "nullable" + - name: "spend_nzd" + type: "integer" + description: "Total amount in NZD spent on election ads by the advertiser." + mode: "nullable" + + + graph_paths: + - "advertiser_stats_transform_csv >> load_advertiser_stats_to_bq" diff --git a/datasets/google_political_ads/advertiser_weekly_spend/advertiser_weekly_spend_dag.py b/datasets/google_political_ads/advertiser_weekly_spend/advertiser_weekly_spend_dag.py new file mode 100644 index 000000000..a2fce3b0c --- /dev/null +++ b/datasets/google_political_ads/advertiser_weekly_spend/advertiser_weekly_spend_dag.py @@ -0,0 +1,174 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.advertiser_weekly_spend", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + advertiser_weekly_spend_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="advertiser_weekly_spend_transform_csv", + startup_timeout_seconds=600, + name="advertiser_weekly_spend", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-advertiser-weekly-spend.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/advertiser_weekly_spend/data_output.csv", + "PIPELINE_NAME": "advertiser_weekly_spend", + "CSV_HEADERS": '["advertiser_id","advertiser_name","election_cycle","week_start_date","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"]', + "RENAME_MAPPINGS": '{"Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Election_Cycle": "election_cycle","Week_Start_Date": "week_start_date","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_advertiser_weekly_spend_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_advertiser_weekly_spend_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/google_political_ads/advertiser_weekly_spend/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="google_political_ads.advertiser_weekly_spend", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "advertiser_id", + "type": "string", + "description": "Unique ID for an advertiser verified to run election ads on Google Ads Services.", + "mode": "nullable", + }, + { + "name": "advertiser_name", + "type": "string", + "description": "Name of advertiser.", + "mode": "nullable", + }, + { + "name": "election_cycle", + "type": "string", + "description": "[DEPRECATED] This field is deprecated in favor of the Elections column in advertiser_stats table. It will be deleted some time after July 2019.", + "mode": "nullable", + }, + { + "name": "week_start_date", + "type": "date", + "description": "The start date for the week where spending occurred.", + "mode": "nullable", + }, + { + "name": "spend_usd", + "type": "integer", + "description": "The amount in USD spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_eur", + "type": "integer", + "description": "The amount in EUR spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_inr", + "type": "integer", + "description": "The amount in INR spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_bgn", + "type": "integer", + "description": "The amount in BGN spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_hrk", + "type": "integer", + "description": "The amount in HRK spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_czk", + "type": "integer", + "description": "The amount in CZK spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_dkk", + "type": "integer", + "description": "The amount in DKK spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_huf", + "type": "integer", + "description": "The amount in HUF spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_pln", + "type": "integer", + "description": "The amount in PLN spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_ron", + "type": "integer", + "description": "The amount in RON spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_sek", + "type": "integer", + "description": "The amount in SEK spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_gbp", + "type": "integer", + "description": "The amount in GBP spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + { + "name": "spend_nzd", + "type": "integer", + "description": "The amount in NZD spent on election ads during the given week by the advertiser.", + "mode": "nullable", + }, + ], + ) + + advertiser_weekly_spend_transform_csv >> load_advertiser_weekly_spend_to_bq diff --git a/datasets/google_political_ads/advertiser_weekly_spend/pipeline.yaml b/datasets/google_political_ads/advertiser_weekly_spend/pipeline.yaml new file mode 100644 index 000000000..6298293d6 --- /dev/null +++ b/datasets/google_political_ads/advertiser_weekly_spend/pipeline.yaml @@ -0,0 +1,180 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: advertiser_weekly_spend + + # Description of the table + description: "This table contains the information for how much an advertiser spent on political ads during a given week. The table's primary key is advertiser_id, election_cycle, week_start_date" + +dag: + airflow_version: 1 + initialize: + dag_id: advertiser_weekly_spend + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "advertiser_weekly_spend_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "advertiser_weekly_spend" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-advertiser-weekly-spend.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/advertiser_weekly_spend/data_output.csv" + PIPELINE_NAME: "advertiser_weekly_spend" + CSV_HEADERS: >- + ["advertiser_id","advertiser_name","election_cycle","week_start_date","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"] + RENAME_MAPPINGS: >- + {"Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Election_Cycle": "election_cycle","Week_Start_Date": "week_start_date","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"} + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_advertiser_weekly_spend_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/advertiser_weekly_spend/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.advertiser_weekly_spend" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "advertiser_id" + type: "string" + description: "Unique ID for an advertiser verified to run election ads on Google Ads Services." + mode: "nullable" + - name: "advertiser_name" + type: "string" + description: "Name of advertiser." + mode: "nullable" + - name: "election_cycle" + type: "string" + description: "[DEPRECATED] This field is deprecated in favor of the Elections column in advertiser_stats table. It will be deleted some time after July 2019." + mode: "nullable" + - name: "week_start_date" + type: "date" + description: "The start date for the week where spending occurred." + mode: "nullable" + - name: "spend_usd" + type: "integer" + description: "The amount in USD spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_eur" + type: "integer" + description: "The amount in EUR spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_inr" + type: "integer" + description: "The amount in INR spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_bgn" + type: "integer" + description: "The amount in BGN spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_hrk" + type: "integer" + description: "The amount in HRK spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_czk" + type: "integer" + description: "The amount in CZK spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_dkk" + type: "integer" + description: "The amount in DKK spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_huf" + type: "integer" + description: "The amount in HUF spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_pln" + type: "integer" + description: "The amount in PLN spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_ron" + type: "integer" + description: "The amount in RON spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_sek" + type: "integer" + description: "The amount in SEK spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_gbp" + type: "integer" + description: "The amount in GBP spent on election ads during the given week by the advertiser." + mode: "nullable" + - name: "spend_nzd" + type: "integer" + description: "The amount in NZD spent on election ads during the given week by the advertiser." + mode: "nullable" + + + graph_paths: + - "advertiser_weekly_spend_transform_csv >> load_advertiser_weekly_spend_to_bq" diff --git a/datasets/google_political_ads/campaign_targeting/campaign_targeting_dag.py b/datasets/google_political_ads/campaign_targeting/campaign_targeting_dag.py new file mode 100644 index 000000000..d4547f238 --- /dev/null +++ b/datasets/google_political_ads/campaign_targeting/campaign_targeting_dag.py @@ -0,0 +1,130 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.campaign_targeting", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + campaign_targeting_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="campaign_targeting_transform_csv", + startup_timeout_seconds=600, + name="campaign_targeting", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-campaign-targeting.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/campaign_targeting/data_output.csv", + "PIPELINE_NAME": "campaign_targeting", + "CSV_HEADERS": '["campaign_id","age_targeting","gender_targeting","geo_targeting_included","geo_targeting_excluded","start_date","end_date","ads_list","advertiser_id","advertiser_name"]', + "RENAME_MAPPINGS": '{"Campaign_ID": "campaign_id","Age_Targeting": "age_targeting","Gender_Targeting": "gender_targeting","Geo_Targeting_Included": "geo_targeting_included","Geo_Targeting_Excluded": "geo_targeting_excluded","Start_Date": "start_date","End_Date": "end_date","Ads_List": "ads_list","Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_campaign_targeting_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_campaign_targeting_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/google_political_ads/campaign_targeting/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="google_political_ads.campaign_targeting", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "campaign_id", + "type": "string", + "description": "[DEPRECATED] Unique ID for a political ad campaign.", + "mode": "nullable", + }, + { + "name": "age_targeting", + "type": "string", + "description": "[DEPRECATED] Age ranges included in the campaign's targeting.", + "mode": "nullable", + }, + { + "name": "gender_targeting", + "type": "string", + "description": "[DEPRECATED] Genders included in the campaign's targeting", + "mode": "nullable", + }, + { + "name": "geo_targeting_included", + "type": "string", + "description": "[DEPRECATED] Geographic locations included in the campaign's targeting.", + "mode": "nullable", + }, + { + "name": "geo_targeting_excluded", + "type": "string", + "description": "[DEPRECATED] Geographic locations excluded from the campaign's targeting.", + "mode": "nullable", + }, + { + "name": "start_date", + "type": "date", + "description": "[DEPRECATED] Start date for the campaign.", + "mode": "nullable", + }, + { + "name": "end_date", + "type": "date", + "description": "[DEPRECATED] End date for the campaign.", + "mode": "nullable", + }, + { + "name": "ads_list", + "type": "string", + "description": "[DEPRECATED] List of Ad_IDs for the campaign.", + "mode": "nullable", + }, + { + "name": "advertiser_id", + "type": "string", + "description": "[DEPRECATED] ID of the advertiser who purchased the ad.", + "mode": "nullable", + }, + { + "name": "advertiser_name", + "type": "string", + "description": "[DEPRECATED] Name of advertiser.", + "mode": "nullable", + }, + ], + ) + + campaign_targeting_transform_csv >> load_campaign_targeting_to_bq diff --git a/datasets/google_political_ads/campaign_targeting/pipeline.yaml b/datasets/google_political_ads/campaign_targeting/pipeline.yaml new file mode 100644 index 000000000..ec7c2c0d1 --- /dev/null +++ b/datasets/google_political_ads/campaign_targeting/pipeline.yaml @@ -0,0 +1,152 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: campaign_targeting + + # Description of the table + description: "This table was deprecated and ad-level targeting information was made available in the `google_political_ads.creative_stats` BigQuery table, effective April 2020. This table contains the information related to ad campaigns run by advertisers." + +dag: + airflow_version: 1 + initialize: + dag_id: campaign_targeting + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "campaign_targeting_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "campaign_targeting" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-campaign-targeting.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/campaign_targeting/data_output.csv" + PIPELINE_NAME: "campaign_targeting" + CSV_HEADERS: >- + ["campaign_id","age_targeting","gender_targeting","geo_targeting_included","geo_targeting_excluded","start_date","end_date","ads_list","advertiser_id","advertiser_name"] + RENAME_MAPPINGS: >- + {"Campaign_ID": "campaign_id","Age_Targeting": "age_targeting","Gender_Targeting": "gender_targeting","Geo_Targeting_Included": "geo_targeting_included","Geo_Targeting_Excluded": "geo_targeting_excluded","Start_Date": "start_date","End_Date": "end_date","Ads_List": "ads_list","Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name"} + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_campaign_targeting_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/campaign_targeting/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.campaign_targeting" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "campaign_id" + type: "string" + description: "[DEPRECATED] Unique ID for a political ad campaign." + mode: "nullable" + - name: "age_targeting" + type: "string" + description: "[DEPRECATED] Age ranges included in the campaign's targeting." + mode: "nullable" + - name: "gender_targeting" + type: "string" + description: "[DEPRECATED] Genders included in the campaign's targeting" + mode: "nullable" + - name: "geo_targeting_included" + type: "string" + description: "[DEPRECATED] Geographic locations included in the campaign's targeting." + mode: "nullable" + - name: "geo_targeting_excluded" + type: "string" + description: "[DEPRECATED] Geographic locations excluded from the campaign's targeting." + mode: "nullable" + - name: "start_date" + type: "date" + description: "[DEPRECATED] Start date for the campaign." + mode: "nullable" + - name: "end_date" + type: "date" + description: "[DEPRECATED] End date for the campaign." + mode: "nullable" + - name: "ads_list" + type: "string" + description: "[DEPRECATED] List of Ad_IDs for the campaign." + mode: "nullable" + - name: "advertiser_id" + type: "string" + description: "[DEPRECATED] ID of the advertiser who purchased the ad." + mode: "nullable" + - name: "advertiser_name" + type: "string" + description: "[DEPRECATED] Name of advertiser." + mode: "nullable" + + + graph_paths: + - "campaign_targeting_transform_csv >> load_campaign_targeting_to_bq" diff --git a/datasets/google_political_ads/creative_stats/creative_stats_dag.py b/datasets/google_political_ads/creative_stats/creative_stats_dag.py new file mode 100644 index 000000000..ac7d9d9dd --- /dev/null +++ b/datasets/google_political_ads/creative_stats/creative_stats_dag.py @@ -0,0 +1,334 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.creative_stats", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + creative_stats_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="creative_stats_transform_csv", + startup_timeout_seconds=600, + name="creative_stats", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-creative-stats.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/creative_stats/data_output.csv", + "PIPELINE_NAME": "creative_stats", + "CSV_HEADERS": '["ad_id","ad_url","ad_type","regions","advertiser_id","advertiser_name","ad_campaigns_list","date_range_start","date_range_end","num_of_days","impressions","spend_usd","first_served_timestamp","last_served_timestamp","age_targeting","gender_targeting","geo_targeting_included","geo_targeting_excluded","spend_range_min_usd","spend_range_max_usd","spend_range_min_eur","spend_range_max_eur","spend_range_min_inr","spend_range_max_inr","spend_range_min_bgn","spend_range_max_bgn","spend_range_min_hrk","spend_range_max_hrk","spend_range_min_czk","spend_range_max_czk","spend_range_min_dkk","spend_range_max_dkk","spend_range_min_huf","spend_range_max_huf","spend_range_min_pln","spend_range_max_pln","spend_range_min_ron","spend_range_max_ron","spend_range_min_sek","spend_range_max_sek","spend_range_min_gbp","spend_range_max_gbp","spend_range_min_nzd","spend_range_max_nzd"]', + "RENAME_MAPPINGS": '{"Ad_ID": "ad_id","Ad_URL": "ad_url","Ad_Type": "ad_type","Regions": "regions","Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Ad_Campaigns_List": "ad_campaigns_list","Date_Range_Start": "date_range_start","Date_Range_End": "date_range_end","Num_of_Days": "num_of_days","Impressions": "impressions","Spend_USD": "spend_usd","Spend_Range_Min_USD": "spend_range_min_usd","Spend_Range_Max_USD": "spend_range_max_usd","Spend_Range_Min_EUR": "spend_range_min_eur","Spend_Range_Max_EUR": "spend_range_max_eur","Spend_Range_Min_INR": "spend_range_min_inr","Spend_Range_Max_INR": "spend_range_max_inr","Spend_Range_Min_BGN": "spend_range_min_bgn","Spend_Range_Max_BGN": "spend_range_max_bgn","Spend_Range_Min_HRK": "spend_range_min_hrk","Spend_Range_Max_HRK": "spend_range_max_hrk","Spend_Range_Min_CZK": "spend_range_min_czk","Spend_Range_Max_CZK": "spend_range_max_czk","Spend_Range_Min_DKK": "spend_range_min_dkk","Spend_Range_Max_DKK": "spend_range_max_dkk","Spend_Range_Min_HUF": "spend_range_min_huf","Spend_Range_Max_HUF": "spend_range_max_huf","Spend_Range_Min_PLN": "spend_range_min_pln","Spend_Range_Max_PLN": "spend_range_max_pln","Spend_Range_Min_RON": "spend_range_min_ron","Spend_Range_Max_RON": "spend_range_max_ron","Spend_Range_Min_SEK": "spend_range_min_sek","Spend_Range_Max_SEK": "spend_range_max_sek","Spend_Range_Min_GBP": "spend_range_min_gbp","Spend_Range_Max_GBP": "spend_range_max_gbp","Spend_Range_Min_NZD": "spend_range_min_nzd","Spend_Range_Max_NZD": "spend_range_max_nzd","Age_Targeting": "age_targeting","Gender_Targeting": "gender_targeting","Geo_Targeting_Included": "geo_targeting_included","Geo_Targeting_Excluded": "geo_targeting_excluded","First_Served_Timestamp": "first_served_timestamp","Last_Served_Timestamp": "last_served_timestamp"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_creative_stats_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_creative_stats_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/google_political_ads/creative_stats/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="google_political_ads.creative_stats", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "ad_id", + "type": "string", + "description": "Unique id for a specific election ad.", + "mode": "nullable", + }, + { + "name": "ad_url", + "type": "string", + "description": "URL to view the election ad in the election Advertising on Google report.", + "mode": "nullable", + }, + { + "name": "ad_type", + "type": "string", + "description": "The type of the ad. Can be TEXT VIDEO or IMAGE.", + "mode": "nullable", + }, + { + "name": "regions", + "type": "string", + "description": "The regions that this ad is verified for or were served in.", + "mode": "nullable", + }, + { + "name": "advertiser_id", + "type": "string", + "description": "ID of the advertiser who purchased the ad.", + "mode": "nullable", + }, + { + "name": "advertiser_name", + "type": "string", + "description": "Name of advertiser.", + "mode": "nullable", + }, + { + "name": "ad_campaigns_list", + "type": "string", + "description": "IDs of all election ad campaigns that included the ad.", + "mode": "nullable", + }, + { + "name": "date_range_start", + "type": "date", + "description": "First day a election ad ran and had an impression.", + "mode": "nullable", + }, + { + "name": "date_range_end", + "type": "date", + "description": "Most recent day a election ad ran and had an impression.", + "mode": "nullable", + }, + { + "name": "num_of_days", + "type": "integer", + "description": "Total number of days a election ad ran and had an impression.", + "mode": "nullable", + }, + { + "name": "impressions", + "type": "string", + "description": "Number of impressions for the election ad. Impressions are grouped into several buckets ≤ 10k 10k–100k 100k–1M 1M–10M > 10M.", + "mode": "nullable", + }, + { + "name": "spend_usd", + "type": "string", + "description": "[DEPRECATED] This field is deprecated in favor of specifying the lower and higher spend bucket bounds in separate Spend_Range_Min and Spend_Range_Max columns.", + "mode": "nullable", + }, + { + "name": "first_served_timestamp", + "type": "timestamp", + "description": "The timestamp of the earliest impression for this ad.", + "mode": "nullable", + }, + { + "name": "last_served_timestamp", + "type": "timestamp", + "description": "The timestamp of the most recent impression for this ad.", + "mode": "nullable", + }, + { + "name": "age_targeting", + "type": "string", + "description": "Age ranges included in the ad's targeting", + "mode": "nullable", + }, + { + "name": "gender_targeting", + "type": "string", + "description": "Genders included in the ad's targeting.", + "mode": "nullable", + }, + { + "name": "geo_targeting_included", + "type": "string", + "description": "Geographic locations included in the ad's targeting.", + "mode": "nullable", + }, + { + "name": "geo_targeting_excluded", + "type": "string", + "description": "Geographic locations excluded in the ad's targeting.", + "mode": "nullable", + }, + { + "name": "spend_range_min_usd", + "type": "integer", + "description": "Lower bound of the amount in USD spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_usd", + "type": "integer", + "description": "Upper bound of the amount in USD spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_eur", + "type": "integer", + "description": "Lower bound of the amount in EUR spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_eur", + "type": "integer", + "description": "Upper bound of the amount in EUR spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_inr", + "type": "integer", + "description": "Lower bound of the amount in INR spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_inr", + "type": "integer", + "description": "Upper bound of the amount in INR spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_bgn", + "type": "integer", + "description": "Lower bound of the amount in BGN spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_bgn", + "type": "integer", + "description": "Upper bound of the amount in BGN spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_hrk", + "type": "integer", + "description": "Lower bound of the amount in HRK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_hrk", + "type": "integer", + "description": "Upper bound of the amount in HRK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_czk", + "type": "integer", + "description": "Lower bound of the amount in CZK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_czk", + "type": "integer", + "description": "Upper bound of the amount in CZK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_dkk", + "type": "integer", + "description": "Lower bound of the amount in DKK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_dkk", + "type": "integer", + "description": "Upper bound of the amount in DKK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_huf", + "type": "integer", + "description": "Lower bound of the amount in HUF spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_huf", + "type": "integer", + "description": "Upper bound of the amount in HUF spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_pln", + "type": "integer", + "description": "Lower bound of the amount in PLN spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_pln", + "type": "integer", + "description": "Upper bound of the amount in PLN spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_ron", + "type": "integer", + "description": "Lower bound of the amount in RON spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_ron", + "type": "integer", + "description": "Upper bound of the amount in RON spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_sek", + "type": "integer", + "description": "Lower bound of the amount in SEK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_sek", + "type": "integer", + "description": "Upper bound of the amount in SEK spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_gbp", + "type": "integer", + "description": "Lower bound of the amount in GBP spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_gbp", + "type": "integer", + "description": "Upper bound of the amount in GBP spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_min_nzd", + "type": "integer", + "description": "Lower bound of the amount in NZD spent by the advertiser on the election ad.", + "mode": "nullable", + }, + { + "name": "spend_range_max_nzd", + "type": "integer", + "description": "Upper bound of the amount in NZD spent by the advertiser on the election ad.", + "mode": "nullable", + }, + ], + ) + + creative_stats_transform_csv >> load_creative_stats_to_bq diff --git a/datasets/google_political_ads/creative_stats/pipeline.yaml b/datasets/google_political_ads/creative_stats/pipeline.yaml new file mode 100644 index 000000000..58ae4cacc --- /dev/null +++ b/datasets/google_political_ads/creative_stats/pipeline.yaml @@ -0,0 +1,287 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: creative_stats + + # Description of the table + description: "This table contains the information for election ads that have appeared on Google Ads Services. Ad-level targeting data was added to this file in April 2020. ad_id is referenced from: campaign_targeting.ads_list Data that was previously available in the `google_political_ads.campaign_targeting` table has been deprecated and removed in favor of this table." + +dag: + airflow_version: 1 + initialize: + dag_id: creative_stats + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "creative_stats_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "creative_stats" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-creative-stats.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/creative_stats/data_output.csv" + PIPELINE_NAME: "creative_stats" + CSV_HEADERS: >- + ["ad_id","ad_url","ad_type","regions","advertiser_id","advertiser_name","ad_campaigns_list","date_range_start","date_range_end","num_of_days","impressions","spend_usd","first_served_timestamp","last_served_timestamp","age_targeting","gender_targeting","geo_targeting_included","geo_targeting_excluded","spend_range_min_usd","spend_range_max_usd","spend_range_min_eur","spend_range_max_eur","spend_range_min_inr","spend_range_max_inr","spend_range_min_bgn","spend_range_max_bgn","spend_range_min_hrk","spend_range_max_hrk","spend_range_min_czk","spend_range_max_czk","spend_range_min_dkk","spend_range_max_dkk","spend_range_min_huf","spend_range_max_huf","spend_range_min_pln","spend_range_max_pln","spend_range_min_ron","spend_range_max_ron","spend_range_min_sek","spend_range_max_sek","spend_range_min_gbp","spend_range_max_gbp","spend_range_min_nzd","spend_range_max_nzd"] + RENAME_MAPPINGS: >- + {"Ad_ID": "ad_id","Ad_URL": "ad_url","Ad_Type": "ad_type","Regions": "regions","Advertiser_ID": "advertiser_id","Advertiser_Name": "advertiser_name","Ad_Campaigns_List": "ad_campaigns_list","Date_Range_Start": "date_range_start","Date_Range_End": "date_range_end","Num_of_Days": "num_of_days","Impressions": "impressions","Spend_USD": "spend_usd","Spend_Range_Min_USD": "spend_range_min_usd","Spend_Range_Max_USD": "spend_range_max_usd","Spend_Range_Min_EUR": "spend_range_min_eur","Spend_Range_Max_EUR": "spend_range_max_eur","Spend_Range_Min_INR": "spend_range_min_inr","Spend_Range_Max_INR": "spend_range_max_inr","Spend_Range_Min_BGN": "spend_range_min_bgn","Spend_Range_Max_BGN": "spend_range_max_bgn","Spend_Range_Min_HRK": "spend_range_min_hrk","Spend_Range_Max_HRK": "spend_range_max_hrk","Spend_Range_Min_CZK": "spend_range_min_czk","Spend_Range_Max_CZK": "spend_range_max_czk","Spend_Range_Min_DKK": "spend_range_min_dkk","Spend_Range_Max_DKK": "spend_range_max_dkk","Spend_Range_Min_HUF": "spend_range_min_huf","Spend_Range_Max_HUF": "spend_range_max_huf","Spend_Range_Min_PLN": "spend_range_min_pln","Spend_Range_Max_PLN": "spend_range_max_pln","Spend_Range_Min_RON": "spend_range_min_ron","Spend_Range_Max_RON": "spend_range_max_ron","Spend_Range_Min_SEK": "spend_range_min_sek","Spend_Range_Max_SEK": "spend_range_max_sek","Spend_Range_Min_GBP": "spend_range_min_gbp","Spend_Range_Max_GBP": "spend_range_max_gbp","Spend_Range_Min_NZD": "spend_range_min_nzd","Spend_Range_Max_NZD": "spend_range_max_nzd","Age_Targeting": "age_targeting","Gender_Targeting": "gender_targeting","Geo_Targeting_Included": "geo_targeting_included","Geo_Targeting_Excluded": "geo_targeting_excluded","First_Served_Timestamp": "first_served_timestamp","Last_Served_Timestamp": "last_served_timestamp"} + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_creative_stats_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/creative_stats/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.creative_stats" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "ad_id" + type: "string" + description: "Unique id for a specific election ad." + mode: "nullable" + - name: "ad_url" + type: "string" + description: "URL to view the election ad in the election Advertising on Google report." + mode: "nullable" + - name: "ad_type" + type: "string" + description: "The type of the ad. Can be TEXT VIDEO or IMAGE." + mode: "nullable" + - name: "regions" + type: "string" + description: "The regions that this ad is verified for or were served in." + mode: "nullable" + - name: "advertiser_id" + type: "string" + description: "ID of the advertiser who purchased the ad." + mode: "nullable" + - name: "advertiser_name" + type: "string" + description: "Name of advertiser." + mode: "nullable" + - name: "ad_campaigns_list" + type: "string" + description: "IDs of all election ad campaigns that included the ad." + mode: "nullable" + - name: "date_range_start" + type: "date" + description: "First day a election ad ran and had an impression." + mode: "nullable" + - name: "date_range_end" + type: "date" + description: "Most recent day a election ad ran and had an impression." + mode: "nullable" + - name: "num_of_days" + type: "integer" + description: "Total number of days a election ad ran and had an impression." + mode: "nullable" + - name: "impressions" + type: "string" + description: "Number of impressions for the election ad. Impressions are grouped into several buckets ≤ 10k 10k–100k 100k–1M 1M–10M > 10M." + mode: "nullable" + - name: "spend_usd" + type: "string" + description: "[DEPRECATED] This field is deprecated in favor of specifying the lower and higher spend bucket bounds in separate Spend_Range_Min and Spend_Range_Max columns." + mode: "nullable" + - name: "first_served_timestamp" + type: "timestamp" + description: "The timestamp of the earliest impression for this ad." + mode: "nullable" + - name: "last_served_timestamp" + type: "timestamp" + description: "The timestamp of the most recent impression for this ad." + mode: "nullable" + - name: "age_targeting" + type: "string" + description: "Age ranges included in the ad's targeting" + mode: "nullable" + - name: "gender_targeting" + type: "string" + description: "Genders included in the ad's targeting." + mode: "nullable" + - name: "geo_targeting_included" + type: "string" + description: "Geographic locations included in the ad's targeting." + mode: "nullable" + - name: "geo_targeting_excluded" + type: "string" + description: "Geographic locations excluded in the ad's targeting." + mode: "nullable" + - name: "spend_range_min_usd" + type: "integer" + description: "Lower bound of the amount in USD spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_usd" + type: "integer" + description: "Upper bound of the amount in USD spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_eur" + type: "integer" + description: "Lower bound of the amount in EUR spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_eur" + type: "integer" + description: "Upper bound of the amount in EUR spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_inr" + type: "integer" + description: "Lower bound of the amount in INR spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_inr" + type: "integer" + description: "Upper bound of the amount in INR spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_bgn" + type: "integer" + description: "Lower bound of the amount in BGN spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_bgn" + type: "integer" + description: "Upper bound of the amount in BGN spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_hrk" + type: "integer" + description: "Lower bound of the amount in HRK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_hrk" + type: "integer" + description: "Upper bound of the amount in HRK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_czk" + type: "integer" + description: "Lower bound of the amount in CZK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_czk" + type: "integer" + description: "Upper bound of the amount in CZK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_dkk" + type: "integer" + description: "Lower bound of the amount in DKK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_dkk" + type: "integer" + description: "Upper bound of the amount in DKK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_huf" + type: "integer" + description: "Lower bound of the amount in HUF spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_huf" + type: "integer" + description: "Upper bound of the amount in HUF spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_pln" + type: "integer" + description: "Lower bound of the amount in PLN spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_pln" + type: "integer" + description: "Upper bound of the amount in PLN spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_ron" + type: "integer" + description: "Lower bound of the amount in RON spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_ron" + type: "integer" + description: "Upper bound of the amount in RON spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_sek" + type: "integer" + description: "Lower bound of the amount in SEK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_sek" + type: "integer" + description: "Upper bound of the amount in SEK spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_gbp" + type: "integer" + description: "Lower bound of the amount in GBP spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_gbp" + type: "integer" + description: "Upper bound of the amount in GBP spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_min_nzd" + type: "integer" + description: "Lower bound of the amount in NZD spent by the advertiser on the election ad." + mode: "nullable" + - name: "spend_range_max_nzd" + type: "integer" + description: "Upper bound of the amount in NZD spent by the advertiser on the election ad." + mode: "nullable" + + + graph_paths: + - "creative_stats_transform_csv >> load_creative_stats_to_bq" diff --git a/datasets/google_political_ads/dataset.yaml b/datasets/google_political_ads/dataset.yaml new file mode 100644 index 000000000..5695f22ab --- /dev/null +++ b/datasets/google_political_ads/dataset.yaml @@ -0,0 +1,90 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +dataset: + # The `dataset` block includes properties for your dataset that will be shown + # to users of your data on the Google Cloud website. + + # Must be exactly the same name as the folder name your dataset.yaml is in. + name: google_political_ads + + # A friendly, human-readable name of the dataset + friendly_name: google_political_ads + + # A short, descriptive summary of the dataset. + description: |- + Overview: This dataset contains information on how much money is spent by verified advertisers on political advertising across Google Ad Services. In addition, insights on demographic targeting used in political ad campaigns by these advertisers are also provided. Finally, links to the actual political ad in the Google Transparency Report (https://transparencyreport.google.com/) are provided. Data for an election expires 7 years after the election. After this point, the data are removed from the dataset and are no longer available. + + Update frequency: Weekly + + Dataset source: Transparency Report: Political Advertising on Google + + Terms of use: + + See the GCP Marketplace listing for more details and sample queries: https://console.cloud.google.com/marketplace/details/transparency-report/google-political-ads + + For more information see: + The Political Advertising on Google Transparency Report at + https://transparencyreport.google.com/political-ads/home + + The supporting Frequently Asked Questions at + https://support.google.com/transparencyreport/answer/9575640?hl=en&ref_topic=7295796 + + # A list of sources the dataset is derived from, using the YAML list syntax. + dataset_sources: ~ + + # A list of terms and conditions that users of the dataset should agree on, + # using the YAML list syntax. + terms_of_use: ~ + + +resources: + # A list of Google Cloud resources needed by your dataset. In principle, all + # pipelines under a dataset should be able to share these resources. + # + # The currently supported resources are shown below. Use only the resources + # you need, and delete the rest as needed by your pipeline. + # + # We will keep adding to the list below to support more Google Cloud resources + # over time. If a resource you need isn't supported, please file an issue on + # the repository. + + - type: bigquery_dataset + # Google BigQuery dataset to namespace all tables managed by this folder + # + # Required Properties: + # dataset_id + # + # Optional Properties: + # friendly_name (A user-friendly name of the dataset) + # description (A user-friendly description of the dataset) + # location (The geographic location where the dataset should reside) + dataset_id: google_political_ads + description: |- + Overview: This dataset contains information on how much money is spent by verified advertisers on political advertising across Google Ad Services. In addition, insights on demographic targeting used in political ad campaigns by these advertisers are also provided. Finally, links to the actual political ad in the Google Transparency Report (https://transparencyreport.google.com/) are provided. Data for an election expires 7 years after the election. After this point, the data are removed from the dataset and are no longer available. + + Update frequency: Weekly + + Dataset source: Transparency Report: Political Advertising on Google + + Terms of use: + + See the GCP Marketplace listing for more details and sample queries: https://console.cloud.google.com/marketplace/details/transparency-report/google-political-ads + + For more information see: + The Political Advertising on Google Transparency Report at + https://transparencyreport.google.com/political-ads/home + + The supporting Frequently Asked Questions at + https://support.google.com/transparencyreport/answer/9575640?hl=en&ref_topic=7295796 diff --git a/datasets/google_political_ads/geo_spend/geo_spend_dag.py b/datasets/google_political_ads/geo_spend/geo_spend_dag.py new file mode 100644 index 000000000..c238ab93f --- /dev/null +++ b/datasets/google_political_ads/geo_spend/geo_spend_dag.py @@ -0,0 +1,166 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.geo_spend", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + geo_spend_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="geo_spend_transform_csv", + startup_timeout_seconds=600, + name="geo_spend", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-geo-spend.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/geo_spend/data_output.csv", + "PIPELINE_NAME": "geo_spend", + "CSV_HEADERS": '["country","country_subdivision_primary","country_subdivision_secondary","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"]', + "RENAME_MAPPINGS": '{"Country": "country","Country_Subdivision_Primary": "country_subdivision_primary","Country_Subdivision_Secondary": "country_subdivision_secondary","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_geo_spend_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_geo_spend_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/google_political_ads/geo_spend/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="google_political_ads.geo_spend", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "country", + "type": "string", + "description": 'The country where election ads were served specified in the ISO 3166-1 alpha-2 standard code. For example "US" for United States.', + "mode": "nullable", + }, + { + "name": "country_subdivision_primary", + "type": "string", + "description": 'The primary subdivision of the country where election ads were served specified by the ISO 3166-2 standard code. For example "US-CA" for California state in United States', + "mode": "nullable", + }, + { + "name": "country_subdivision_secondary", + "type": "string", + "description": "The name of the secondary subdivision. For example The name of a US congressional district.", + "mode": "nullable", + }, + { + "name": "spend_usd", + "type": "integer", + "description": "Total amount in USD spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_eur", + "type": "integer", + "description": "Total amount in EUR spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_inr", + "type": "integer", + "description": "Total amount in INR spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_bgn", + "type": "integer", + "description": "Total amount in BGN spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_hrk", + "type": "integer", + "description": "Total amount in HRK spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_czk", + "type": "integer", + "description": "Total amount in CZK spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_dkk", + "type": "integer", + "description": "Total amount in DKK spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_huf", + "type": "integer", + "description": "Total amount in HUF spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_pln", + "type": "integer", + "description": "Total amount in PLN spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_ron", + "type": "integer", + "description": "Total amount in RON spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_sek", + "type": "integer", + "description": "Total amount in SEK spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_gbp", + "type": "integer", + "description": "Total amount in GBP spent on election ads in this region.", + "mode": "nullable", + }, + { + "name": "spend_nzd", + "type": "integer", + "description": "Total amount in NZD spent on election ads in this region.", + "mode": "nullable", + }, + ], + ) + + geo_spend_transform_csv >> load_geo_spend_to_bq diff --git a/datasets/google_political_ads/geo_spend/pipeline.yaml b/datasets/google_political_ads/geo_spend/pipeline.yaml new file mode 100644 index 000000000..c5475c05d --- /dev/null +++ b/datasets/google_political_ads/geo_spend/pipeline.yaml @@ -0,0 +1,176 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: geo_spend + + # Description of the table + description: "This table contains the information for how much is spent buying election ads on Google Ads Services. The data is aggregated by Congressional district. The primary key is state, congressional_district." + +dag: + airflow_version: 1 + initialize: + dag_id: geo_spend + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "geo_spend_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "geo_spend" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-geo-spend.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/geo_spend/data_output.csv" + PIPELINE_NAME: "geo_spend" + CSV_HEADERS: >- + ["country","country_subdivision_primary","country_subdivision_secondary","spend_usd","spend_eur","spend_inr","spend_bgn","spend_hrk","spend_czk","spend_dkk","spend_huf","spend_pln","spend_ron","spend_sek","spend_gbp","spend_nzd"] + RENAME_MAPPINGS: >- + {"Country": "country","Country_Subdivision_Primary": "country_subdivision_primary","Country_Subdivision_Secondary": "country_subdivision_secondary","Spend_USD": "spend_usd","Spend_EUR": "spend_eur","Spend_INR": "spend_inr","Spend_BGN": "spend_bgn","Spend_HRK": "spend_hrk","Spend_CZK": "spend_czk","Spend_DKK": "spend_dkk","Spend_HUF": "spend_huf","Spend_PLN": "spend_pln","Spend_RON": "spend_ron","Spend_SEK": "spend_sek","Spend_GBP": "spend_gbp","Spend_NZD": "spend_nzd"} + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_geo_spend_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/geo_spend/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.geo_spend" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "country" + type: "string" + description: "The country where election ads were served specified in the ISO 3166-1 alpha-2 standard code. For example \"US\" for United States." + mode: "nullable" + - name: "country_subdivision_primary" + type: "string" + description: "The primary subdivision of the country where election ads were served specified by the ISO 3166-2 standard code. For example \"US-CA\" for California state in United States" + mode: "nullable" + - name: "country_subdivision_secondary" + type: "string" + description: "The name of the secondary subdivision. For example The name of a US congressional district." + mode: "nullable" + - name: "spend_usd" + type: "integer" + description: "Total amount in USD spent on election ads in this region." + mode: "nullable" + - name: "spend_eur" + type: "integer" + description: "Total amount in EUR spent on election ads in this region." + mode: "nullable" + - name: "spend_inr" + type: "integer" + description: "Total amount in INR spent on election ads in this region." + mode: "nullable" + - name: "spend_bgn" + type: "integer" + description: "Total amount in BGN spent on election ads in this region." + mode: "nullable" + - name: "spend_hrk" + type: "integer" + description: "Total amount in HRK spent on election ads in this region." + mode: "nullable" + - name: "spend_czk" + type: "integer" + description: "Total amount in CZK spent on election ads in this region." + mode: "nullable" + - name: "spend_dkk" + type: "integer" + description: "Total amount in DKK spent on election ads in this region." + mode: "nullable" + - name: "spend_huf" + type: "integer" + description: "Total amount in HUF spent on election ads in this region." + mode: "nullable" + - name: "spend_pln" + type: "integer" + description: "Total amount in PLN spent on election ads in this region." + mode: "nullable" + - name: "spend_ron" + type: "integer" + description: "Total amount in RON spent on election ads in this region." + mode: "nullable" + - name: "spend_sek" + type: "integer" + description: "Total amount in SEK spent on election ads in this region." + mode: "nullable" + - name: "spend_gbp" + type: "integer" + description: "Total amount in GBP spent on election ads in this region." + mode: "nullable" + - name: "spend_nzd" + type: "integer" + description: "Total amount in NZD spent on election ads in this region." + mode: "nullable" + + + graph_paths: + - "geo_spend_transform_csv >> load_geo_spend_to_bq" diff --git a/datasets/google_political_ads/last_updated/last_updated_dag.py b/datasets/google_political_ads/last_updated/last_updated_dag.py new file mode 100644 index 000000000..7219bc687 --- /dev/null +++ b/datasets/google_political_ads/last_updated/last_updated_dag.py @@ -0,0 +1,76 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.last_updated", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + last_updated_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="last_updated_transform_csv", + startup_timeout_seconds=600, + name="last_updated", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-updated*", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/last_updated/data_output.csv", + "PIPELINE_NAME": "last_updated", + "CSV_HEADERS": '["report_data_updated_date"]', + "RENAME_MAPPINGS": '{"Report_Data_Updated_Date": "report_data_updated_date"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_last_updated_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_last_updated_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=["data/google_political_ads/last_updated/data_output.csv"], + source_format="CSV", + destination_project_dataset_table="google_political_ads.last_updated", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "report_data_updated_date", + "type": "Date", + "description": "The date the report data was most reecntly updated", + "mode": "nullable", + } + ], + ) + + last_updated_transform_csv >> load_last_updated_to_bq diff --git a/datasets/google_political_ads/last_updated/pipeline.yaml b/datasets/google_political_ads/last_updated/pipeline.yaml new file mode 100644 index 000000000..b84262173 --- /dev/null +++ b/datasets/google_political_ads/last_updated/pipeline.yaml @@ -0,0 +1,116 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: last_updated + + # Description of the table + description: "This table contains the information of the latest updated date for the Political Ads report. All dates provided are per UTC time zone." + +dag: + airflow_version: 1 + initialize: + dag_id: last_updated + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "last_updated_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "last_updated" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-updated*" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/last_updated/data_output.csv" + PIPELINE_NAME: "last_updated" + CSV_HEADERS: >- + ["report_data_updated_date"] + RENAME_MAPPINGS: >- + {"Report_Data_Updated_Date": "report_data_updated_date"} + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_last_updated_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/last_updated/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.last_updated" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "report_data_updated_date" + type: "Date" + description: "The date the report data was most reecntly updated" + mode: "nullable" + + + graph_paths: + - "last_updated_transform_csv >> load_last_updated_to_bq" diff --git a/datasets/google_political_ads/top_keywords_history/pipeline.yaml b/datasets/google_political_ads/top_keywords_history/pipeline.yaml new file mode 100644 index 000000000..f40cd7b31 --- /dev/null +++ b/datasets/google_political_ads/top_keywords_history/pipeline.yaml @@ -0,0 +1,176 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- +resources: + + - type: bigquery_table + # Required Properties: + table_id: top_keywords_history + + # Description of the table + description: "The “Top Keywords” section of the US report was removed and updates to this table were terminated in December 2019. The table reflects historical data. This table contains the information for the top six keywords on which political advertisers have spent money during an election cycle. This data is only provided for US elections. The primary key is election_cycle, report_date." + +dag: + airflow_version: 1 + initialize: + dag_id: top_keywords_history + default_args: + owner: "Google" + + # When set to True, keeps a task from getting triggered if the previous schedule for the task hasn’t succeeded + depends_on_past: False + start_date: '2021-03-01' + max_active_runs: 1 + schedule_interval: "@daily" + catchup: False + default_view: graph + + tasks: + - operator: "KubernetesPodOperator" + + # Task description + description: "Run CSV transform within kubernetes pod" + + args: + + task_id: "top_keywords_history_transform_csv" + + startup_timeout_seconds: 600 + + # The name of the pod in which the task will run. This will be used (plus a random suffix) to generate a pod id + name: "top_keywords_history" + + # The namespace to run within Kubernetes. Always set its value to "default" because we follow the guideline that KubernetesPodOperator will only be used for very light workloads, i.e. use the Cloud Composer environment's resources without starving other pipelines. + namespace: "default" + + image_pull_policy: "Always" + + # Docker images will be built and pushed to GCR by default whenever the `scripts/generate_dag.py` is run. To skip building and pushing images, use the optional `--skip-builds` flag. + image: "{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}" + + # Set the environment variables you need initialized in the container. Use these as input variables for the script your container is expected to perform. + env_vars: + SOURCE_URL: "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip" + SOURCE_FILE: "files/data.zip" + FILE_NAME: "google-political-ads-transparency-bundle/google-political-ads-top-keywords-history.csv" + TARGET_FILE: "files/data_output.csv" + TARGET_GCS_BUCKET: "{{ var.json.shared.composer_bucket }}" + TARGET_GCS_PATH: "data/google_political_ads/top_keywords_history/data_output.csv" + PIPELINE_NAME: "top_keywords_history" + CSV_HEADERS: >- + ["election_cycle","report_date","keyword_1","spend_usd_1","keyword_2","spend_usd_2","keyword_3","spend_usd_3","keyword_4","spend_usd_4","keyword_5","spend_usd_5","keyword_6","spend_usd_6","region","elections"] + RENAME_MAPPINGS: >- + {"Election_Cycle": "election_cycle","Report_Date": "report_date","Keyword_1": "keyword_1","Spend_USD_1": "spend_usd_1","Keyword_2": "keyword_2","Spend_USD_2": "spend_usd_2","Keyword_3": "keyword_3","Spend_USD_3": "spend_usd_3","Keyword_4": "keyword_4","Spend_USD_4": "spend_usd_4","Keyword_5": "keyword_5","Spend_USD_5": "spend_usd_5","Keyword_6": "keyword_6","Spend_USD_6": "spend_usd_6","Region": "region","Elections": "elections"} + + # Set resource limits for the pod here. For resource units in Kubernetes, see https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#resource-units-in-kubernetes + resources: + request_memory: "2G" + request_cpu: "1" + + - operator: "GoogleCloudStorageToBigQueryOperator" + description: "Task to load CSV data to a BigQuery table" + + args: + task_id: "load_top_keywords_history_to_bq" + + # The GCS bucket where the CSV file is located in. + bucket: "{{ var.json.shared.composer_bucket }}" + + # The GCS object path for the CSV file + source_objects: ["data/google_political_ads/top_keywords_history/data_output.csv"] + source_format: "CSV" + destination_project_dataset_table: "google_political_ads.top_keywords_history" + + # Use this if your CSV file contains a header row + skip_leading_rows: 1 + + # How to write data to the table: overwrite, append, or write if empty + # See https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/WriteDisposition + write_disposition: "WRITE_TRUNCATE" + + # The BigQuery table schema based on the CSV file. For more info, see + # https://cloud.google.com/bigquery/docs/schemas. + # Always use snake_case and lowercase for column names, and be explicit, + # i.e. specify modes for all columns. + + schema_fields: + - name: "election_cycle" + type: "string" + description: "[DEPRECATED] This field is deprecated in favor of the Region and Elections field. It will be deleted some time after July 2019." + mode: "nullable" + - name: "report_date" + type: "date" + description: "[DEPRECATED] The start date for the week where the spending was reported." + mode: "nullable" + - name: "keyword_1" + type: "string" + description: " [DEPRECATED] Keyword with the most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_1" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_1." + mode: "nullable" + - name: "keyword_2" + type: "string" + description: "[DEPRECATED] Keyword with the next most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_2" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_2." + mode: "nullable" + - name: "keyword_3" + type: "string" + description: "[DEPRECATED] Keyword with the next most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_3" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_3." + mode: "nullable" + - name: "keyword_4" + type: "string" + description: "[DEPRECATED] Keyword with the next most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_4" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_4." + mode: "nullable" + - name: "keyword_5" + type: "string" + description: "[DEPRECATED] Keyword with the next most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_5" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_5." + mode: "nullable" + - name: "keyword_6" + type: "string" + description: "[DEPRECATED] Keyword with the next most spend by advertisers for political ads" + mode: "nullable" + - name: "spend_usd_6" + type: "integer" + description: "[DEPRECATED] Total spend in USD for Keyword_6." + mode: "nullable" + - name: "region" + type: "string" + description: "[DEPRECATED] The region where advertisers used these keywords." + mode: "nullable" + - name: "elections" + type: "string" + description: "[DEPRECATED] The elections during which these keywords were used." + mode: "nullable" + + + graph_paths: + - "top_keywords_history_transform_csv >> load_top_keywords_history_to_bq" diff --git a/datasets/google_political_ads/top_keywords_history/top_keywords_history_dag.py b/datasets/google_political_ads/top_keywords_history/top_keywords_history_dag.py new file mode 100644 index 000000000..6d4c0af06 --- /dev/null +++ b/datasets/google_political_ads/top_keywords_history/top_keywords_history_dag.py @@ -0,0 +1,168 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from airflow import DAG +from airflow.contrib.operators import gcs_to_bq, kubernetes_pod_operator + +default_args = { + "owner": "Google", + "depends_on_past": False, + "start_date": "2021-03-01", +} + + +with DAG( + dag_id="google_political_ads.top_keywords_history", + default_args=default_args, + max_active_runs=1, + schedule_interval="@daily", + catchup=False, + default_view="graph", +) as dag: + + # Run CSV transform within kubernetes pod + top_keywords_history_transform_csv = kubernetes_pod_operator.KubernetesPodOperator( + task_id="top_keywords_history_transform_csv", + startup_timeout_seconds=600, + name="top_keywords_history", + namespace="default", + image_pull_policy="Always", + image="{{ var.json.google_political_ads.container_registry.run_csv_transform_kub }}", + env_vars={ + "SOURCE_URL": "https://storage.googleapis.com/transparencyreport/google-political-ads-transparency-bundle.zip", + "SOURCE_FILE": "files/data.zip", + "FILE_NAME": "google-political-ads-transparency-bundle/google-political-ads-top-keywords-history.csv", + "TARGET_FILE": "files/data_output.csv", + "TARGET_GCS_BUCKET": "{{ var.json.shared.composer_bucket }}", + "TARGET_GCS_PATH": "data/google_political_ads/top_keywords_history/data_output.csv", + "PIPELINE_NAME": "top_keywords_history", + "CSV_HEADERS": '["election_cycle","report_date","keyword_1","spend_usd_1","keyword_2","spend_usd_2","keyword_3","spend_usd_3","keyword_4","spend_usd_4","keyword_5","spend_usd_5","keyword_6","spend_usd_6","region","elections"]', + "RENAME_MAPPINGS": '{"Election_Cycle": "election_cycle","Report_Date": "report_date","Keyword_1": "keyword_1","Spend_USD_1": "spend_usd_1","Keyword_2": "keyword_2","Spend_USD_2": "spend_usd_2","Keyword_3": "keyword_3","Spend_USD_3": "spend_usd_3","Keyword_4": "keyword_4","Spend_USD_4": "spend_usd_4","Keyword_5": "keyword_5","Spend_USD_5": "spend_usd_5","Keyword_6": "keyword_6","Spend_USD_6": "spend_usd_6","Region": "region","Elections": "elections"}', + }, + resources={"request_memory": "2G", "request_cpu": "1"}, + ) + + # Task to load CSV data to a BigQuery table + load_top_keywords_history_to_bq = gcs_to_bq.GoogleCloudStorageToBigQueryOperator( + task_id="load_top_keywords_history_to_bq", + bucket="{{ var.json.shared.composer_bucket }}", + source_objects=[ + "data/google_political_ads/top_keywords_history/data_output.csv" + ], + source_format="CSV", + destination_project_dataset_table="google_political_ads.top_keywords_history", + skip_leading_rows=1, + write_disposition="WRITE_TRUNCATE", + schema_fields=[ + { + "name": "election_cycle", + "type": "string", + "description": "[DEPRECATED] This field is deprecated in favor of the Region and Elections field. It will be deleted some time after July 2019.", + "mode": "nullable", + }, + { + "name": "report_date", + "type": "date", + "description": "[DEPRECATED] The start date for the week where the spending was reported.", + "mode": "nullable", + }, + { + "name": "keyword_1", + "type": "string", + "description": " [DEPRECATED] Keyword with the most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_1", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_1.", + "mode": "nullable", + }, + { + "name": "keyword_2", + "type": "string", + "description": "[DEPRECATED] Keyword with the next most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_2", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_2.", + "mode": "nullable", + }, + { + "name": "keyword_3", + "type": "string", + "description": "[DEPRECATED] Keyword with the next most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_3", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_3.", + "mode": "nullable", + }, + { + "name": "keyword_4", + "type": "string", + "description": "[DEPRECATED] Keyword with the next most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_4", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_4.", + "mode": "nullable", + }, + { + "name": "keyword_5", + "type": "string", + "description": "[DEPRECATED] Keyword with the next most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_5", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_5.", + "mode": "nullable", + }, + { + "name": "keyword_6", + "type": "string", + "description": "[DEPRECATED] Keyword with the next most spend by advertisers for political ads", + "mode": "nullable", + }, + { + "name": "spend_usd_6", + "type": "integer", + "description": "[DEPRECATED] Total spend in USD for Keyword_6.", + "mode": "nullable", + }, + { + "name": "region", + "type": "string", + "description": "[DEPRECATED] The region where advertisers used these keywords.", + "mode": "nullable", + }, + { + "name": "elections", + "type": "string", + "description": "[DEPRECATED] The elections during which these keywords were used.", + "mode": "nullable", + }, + ], + ) + + top_keywords_history_transform_csv >> load_top_keywords_history_to_bq