Skip to content

Commit

Permalink
Minor Improvements on Pipelines (#105)
Browse files Browse the repository at this point in the history
* predicting for only the users with traffic in the past 72h - purchase propensity

* running inference only for users events in the past 72h

* including 72h users for all models predictions

* considering null values in TabWorkflow models

* deleting unused pipfile

* upgrading lib versions

* implementing reporting preprocessing as a new pipeline

---------

Co-authored-by: Carlos Timoteo <ctimoteo@google.com>
  • Loading branch information
chmstimoteo and Carlos Timoteo committed May 13, 2024
1 parent 46a39c7 commit a57a8d0
Show file tree
Hide file tree
Showing 18 changed files with 515 additions and 320 deletions.
24 changes: 0 additions & 24 deletions Pipfile

This file was deleted.

105 changes: 90 additions & 15 deletions config/config.yaml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,14 @@ vertex_ai:
data_source_bigquery_table_path: "bq://${project_id}.aggregated_vbb.aggregated_value_based_bidding_training_full_dataset"
data_source_bigquery_table_schema: "../sql/schema/table/value_based_bidding_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
transform_dataflow_max_num_workers: 10
stats_and_example_gen_dataflow_max_num_workers: 10
evaluation_dataflow_starting_num_workers: 5
evaluation_dataflow_max_num_workers: 10
distill_batch_predict_max_replica_count: 10
distill_batch_predict_starting_replica_count: 10
evaluation_batch_predict_max_replica_count: 10
evaluation_batch_predict_starting_replica_count: 10
timestamp_split_key: null
stratified_split_key: null
weight_column: null
Expand Down Expand Up @@ -557,9 +565,6 @@ vertex_ai:
model_metric_threshold: 400
number_of_models_considered: 1
bigquery_destination_prefix: "${project_id}.aggregated_vbb.vbb_weights"
# These are parameter to inovoke the aggregations of all daily predictions into a single table.
aggregated_predictions_dataset_location: "${location}"
query_aggregate_last_day_predictions: "CALL `${project_id}.aggregated_predictions.aggregate_last_day_predictions`();"
pipeline_parameters_substitutions: null

# This pipeline contains the configuration parameters for the propensity training and inference pipelines for the purchase propensity model.
Expand Down Expand Up @@ -624,6 +629,14 @@ vertex_ai:
data_source_bigquery_table_path: "bq://${project_id}.purchase_propensity.v_purchase_propensity_training_30_15_last_window"
data_source_bigquery_table_schema: "../sql/schema/table/purchase_propensity_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
transform_dataflow_max_num_workers: 10
stats_and_example_gen_dataflow_max_num_workers: 10
evaluation_dataflow_starting_num_workers: 5
evaluation_dataflow_max_num_workers: 10
distill_batch_predict_max_replica_count: 10
distill_batch_predict_starting_replica_count: 10
evaluation_batch_predict_max_replica_count: 10
evaluation_batch_predict_starting_replica_count: 10
timestamp_split_key: null
stratified_split_key: null
weight_column: null
Expand Down Expand Up @@ -689,9 +702,6 @@ vertex_ai:
# For probabilities higher than `threashold`, set postive label to 1, otherwise 0.
threashold: 0.5
positive_label: "1"
# These are parameters to invoke the aggregations of all daily predictions into a single table.
aggregated_predictions_dataset_location: "${location}"
query_aggregate_last_day_predictions: "CALL `${project_id}.aggregated_predictions.aggregate_last_day_predictions`();"
# THese are parameters to trigger the Activation Application Dataflow.
pubsub_activation_topic: "activation-trigger"
pubsub_activation_type: "purchase-propensity-30-15" # purchase-propensity-30-15 | purchase-propensity-15-15 | purchase-propensity-15-7"
Expand Down Expand Up @@ -781,9 +791,6 @@ vertex_ai:
# This is the prediction dataset table or view.
bigquery_source: "${project_id}.audience_segmentation.v_audience_segmentation_inference_15"
bigquery_destination_prefix: "${project_id}.audience_segmentation.pred_audience_segmentation_inference_15"
# These are parameters to invoke the aggregations of all daily predictions into a single table.
aggregated_predictions_dataset_location: "${location}"
query_aggregate_last_day_predictions: "CALL `${project_id}.aggregated_predictions.aggregate_last_day_predictions`();"
# THese are parameters to trigger the Activation Application Dataflow.
pubsub_activation_topic: "activation-trigger"
pubsub_activation_type: "audience-segmentation-15"
Expand Down Expand Up @@ -842,9 +849,6 @@ vertex_ai:
model_name: "interest-cluster-model"
bigquery_source: "${project_id}.auto_audience_segmentation.v_auto_audience_segmentation_inference_15"
bigquery_destination_prefix: "${project_id}.auto_audience_segmentation"
# These are parameters to invoke the aggregations of all daily predictions into a single table.
aggregated_predictions_dataset_location: "${location}"
query_aggregate_last_day_predictions: "CALL `${project_id}.aggregated_predictions.aggregate_last_day_predictions`();"
# These are parameters to trigger the Activation Application Dataflow.
pubsub_activation_topic: "activation-trigger"
pubsub_activation_type: "auto-audience-segmentation-15"
Expand Down Expand Up @@ -914,6 +918,14 @@ vertex_ai:
data_source_bigquery_table_path: "bq://${project_id}.purchase_propensity.v_purchase_propensity_training_30_30_last_window"
data_source_bigquery_table_schema: "../sql/schema/table/purchase_propensity_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
transform_dataflow_max_num_workers: 10
stats_and_example_gen_dataflow_max_num_workers: 10
evaluation_dataflow_starting_num_workers: 5
evaluation_dataflow_max_num_workers: 10
distill_batch_predict_max_replica_count: 10
distill_batch_predict_starting_replica_count: 10
evaluation_batch_predict_max_replica_count: 10
evaluation_batch_predict_starting_replica_count: 10
timestamp_split_key: null
stratified_split_key: null
weight_column: null
Expand Down Expand Up @@ -1005,6 +1017,14 @@ vertex_ai:
data_source_bigquery_table_path: "bq://${project_id}.customer_lifetime_value.v_customer_lifetime_value_training_180_30_last_window"
data_source_bigquery_table_schema: "../sql/schema/table/customer_lifetime_value_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
transform_dataflow_max_num_workers: 10
stats_and_example_gen_dataflow_max_num_workers: 10
evaluation_dataflow_starting_num_workers: 5
evaluation_dataflow_max_num_workers: 10
distill_batch_predict_max_replica_count: 10
distill_batch_predict_starting_replica_count: 10
evaluation_batch_predict_max_replica_count: 10
evaluation_batch_predict_starting_replica_count: 10
timestamp_split_key: null
stratified_split_key: null
weight_column: null
Expand Down Expand Up @@ -1081,13 +1101,68 @@ vertex_ai:
accelerator_count: 0
accelerator_type: "ACCELERATOR_TYPE_UNSPECIFIED" # ONE OF ACCELERATOR_TYPE_UNSPECIFIED, NVIDIA_TESLA_K80, NVIDIA_TESLA_P100, NVIDIA_TESLA_V100, NVIDIA_TESLA_P4, NVIDIA_TESLA_T4
generate_explanation: false
# These are parameters to invoke the aggregations of all daily predictions into a single table.
aggregated_predictions_dataset_location: "${location}"
query_aggregate_last_day_predictions: "CALL `${project_id}.aggregated_predictions.aggregate_last_day_predictions`();"
# THese are parameters to trigger the Activation Application Dataflow.
pubsub_activation_topic: "activation-trigger"
pubsub_activation_type: "cltv-180-30" # cltv-180-180 | cltv-180-90 | cltv-180-30
pipeline_parameters_substitutions: null

# This pipeline contains the configuration parameters for the feature creation pipeline for the audience segmentation model.
# This block defines the pipeline parameters that are going to be used for three tasks: compilation, upload and scheduling.
# To deploy this pipeline to your Google Cloud project:
## 1. Define the pipeline parameters below, following YAML format
## 2. Define the queries and procedures SQL parameters in this file under the `bigquery` section, following YAML format
## 3. Create the queries and procedures SQL files under sql/ folder
## 4. Create the terraform resources uin terraform/feature-store/bigquery-procedures.tf
## 5. Create the terraform resources to compile and schedule the pipeline in terraform/pipelines/pipelines.tf
## 6. Define python function that perform `compilation` and `upload to GCS bucket` are defined in `python/pipelines/compiler.py` and `python/pipelines/uploader.py`.
## 7. Define python function that perform `schedule` of the pipeline is defined in `python/pipelines/scheduler.py`.
## 8. Create the pipeline python function in python/pipelines/feature_engineering_pipelines.py
## 9. Run terraform apply
reporting_preparation:
execution:
# The `name` parameter is the name of the pipeline that will appear in the Vertex AI pipeline UI.
name: "reporting-preparation"
# The `job_id_prefix` is the prefix of the Vertex AI Custom Job that will be used at the execution of each individual component step.
job_id_prefix: "reporting-preparation-"
# The `experiment_name` is the name of the experiment that will appear in the Vertex AI Experiments UI.
experiment_name: "reporting-preparation"
# The `type` defines whether the pipeline is going to be a `tabular-workflows` or a `custom` pipeline.
# `type` must be "custom", when we're building Python and/or SQL based pipelines for feature engineering purposes.
type: "custom"
# The `schedule` defines the schedule values of the pipeline.
# This solution uses the Vertex AI Pipeline Scheduler.
# More information can be found at https://cloud.google.com/vertex-ai/docs/pipelines/scheduler.
schedule:
# The `cron` is the cron schedule. Make sure you review the TZ=America/New_York timezone.
# More information can be found at https://cloud.google.com/scheduler/docs/configuring/cron-job-schedules.
cron: "TZ=America/New_York 0 8 * * *"
# The `max_concurrent_run_count` defines the maximum number of concurrent pipeline runs.
max_concurrent_run_count: 1
start_time: null
end_time: null
# The `state` defines the state of the pipeline.
# In case you don't want to schedule the pipeline, set the state to `PAUSED`.
state: PAUSED # possible states ACTIVE or PAUSED
# The `pipeline_parameters` defines the parameters that are going to be used to compile the pipeline.
# Those values may difer depending on the pipeline type and the pipeline steps being used.
# Make sure you review the python function the defines the pipeline.
# The pipeline definition function can be found in `python/pipelines/feature_engineering_pipelines.py`
# or other files ending with `python/pipelines/*_pipeline.py`.
pipeline_parameters:
project_id: "${project_id}"
location: "${location}"
query_aggregate_last_day_predictions: "
CALL `{aggregate_last_day_predictions_procedure_name}`();"
# The `timeout` parameter defines the timeout of the pipeline in seconds.
# The default value is 3600 seconds (1 hour).
timeout: 3600.0
# The `pipeline_parameters_substitutions` defines the substitutions that are going to be applied to the pipeline parameters before compilation.
# Check the parameter values above to see if they are used.
# They typically follow this format {parameter_subsititution_key}.
# To apply a substitution, make sure you define the pair: {parameter_subsititution_key}: {parameter_subsititution_value}.
pipeline_parameters_substitutions: # Substitutions are applied to the parameters before compilation
aggregate_last_day_predictions_procedure_name: "${project_id}.aggregated_predictions.aggregate_last_day_predictions"
date_timezone: "UTC" # used when input_date is None and need to get current date.

# This block contains configuration parameters for the BigQuery Datasets, Tables, Queries and Stored Procedures.
bigquery:
Expand Down
1 change: 1 addition & 0 deletions googleb3fdc576fe73874e.html
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
google-site-verification: googleb3fdc576fe73874e.html
4 changes: 3 additions & 1 deletion infrastructure/terraform/modules/activation/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,9 @@ module "pipeline_bucket" {
project_id = var.project_id
name = "${local.app_prefix}-app-${var.project_id}"
location = var.location
force_destroy = true
# When deleting a bucket, this boolean option will delete all contained objects.
# If false, Terraform will fail to delete buckets which contain objects.
force_destroy = false

lifecycle_rules = [{
action = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,15 @@ locals {
# it failed to create resources that are already exist. To resolve you
# need to import the the existing dataset and tables to terraform using
# the following commands:
# > `terraform import module.feature_store[0].module.aggregated_vbb.google_bigquery_dataset.main 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb'`
# > `terraform -chdir="${TERRAFORM_RUN_DIR}" import module.feature_store[0].module.aggregated_vbb.google_bigquery_dataset.main 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb'`
#
# > `terraform import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["vbb_weights"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/vbb_weights`
# > `terraform -chdir="${TERRAFORM_RUN_DIR}" import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["vbb_weights"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/vbb_weights'`
#
# > `terraform import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_volume_weekly"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_volume_weekly'`
# > `terraform -chdir="${TERRAFORM_RUN_DIR}" import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_volume_weekly"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_volume_weekly'`
#
# > `terraform import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_correlation"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_correlation'`
# > `terraform -chdir="${TERRAFORM_RUN_DIR}" import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_correlation"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_correlation'`
#
# > `terraform import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_volume_daily"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_volume_daily'`
# > `terraform -chdir="${TERRAFORM_RUN_DIR}" import 'module.feature_store[0].module.aggregated_vbb.google_bigquery_table.main["aggregated_value_based_bidding_volume_daily"]' 'projects/${MAJ_FEATURE_STORE_PROJECT_ID}/datasets/aggregated_vbb/tables/aggregated_value_based_bidding_volume_daily'`
#
# You also need to remove the information of the existing aggregated_vbb
# dataset from the terraform state by running following command:
Expand All @@ -182,7 +182,7 @@ module "aggregated_vbb" {
location = local.config_bigquery.dataset.aggregated_vbb.location
# The delete_contents_on_destroy attribute specifies whether the contents of the dataset should be deleted when the dataset is destroyed.
# In this case, the delete_contents_on_destroy attribute is set to false, which means that the contents of the dataset will not be deleted when the dataset is destroyed.
delete_contents_on_destroy = false
delete_contents_on_destroy = true

dataset_labels = {
version = "prod"
Expand All @@ -195,7 +195,7 @@ module "aggregated_vbb" {
# The max_time_travel_hours attribute specifies the maximum number of hours that data in the dataset can be accessed using time travel queries.
# In this case, the maximum time travel hours is set to the value of the local file config.yaml section bigquery.dataset.auto_audience_segmentation.max_time_travel_hours configuration.
max_time_travel_hours = local.config_bigquery.dataset.aggregated_vbb.max_time_travel_hours
deletion_protection = true
deletion_protection = false
time_partitioning = null,
range_partitioning = null,
expiration_time = null,
Expand Down
21 changes: 21 additions & 0 deletions infrastructure/terraform/modules/pipelines/pipelines.tf
Original file line number Diff line number Diff line change
Expand Up @@ -603,3 +603,24 @@ resource "null_resource" "compile_value_based_bidding_explanation_pipelines" {
working_dir = self.triggers.working_dir
}
}

# This resource is used to compile and upload the Vertex AI pipeline for preparing data for the reports
resource "null_resource" "compile_reporting_preparation_aggregate_predictions_pipelines" {
triggers = {
working_dir = "${local.source_root_dir}/python"
tag = local.compile_pipelines_tag
upstream_resource_dependency = null_resource.compile_value_based_bidding_explanation_pipelines.id
}

# The provisioner block specifies the command that will be executed to compile and upload the pipeline.
# This command will execute the compiler function in the pipelines module, which will compile the pipeline YAML file, and the uploader function,
# which will upload the pipeline YAML file to the specified Artifact Registry repository. The scheduler function will then schedule the pipeline to run on a regular basis.
provisioner "local-exec" {
command = <<-EOT
${var.poetry_run_alias} python -m pipelines.compiler -c ${local.config_file_path_relative_python_run_dir} -p vertex_ai.pipelines.reporting_preparation.execution -o reporting_preparation.yaml
${var.poetry_run_alias} python -m pipelines.uploader -c ${local.config_file_path_relative_python_run_dir} -f reporting_preparation.yaml -t ${self.triggers.tag} -t latest
${var.poetry_run_alias} python -m pipelines.scheduler -c ${local.config_file_path_relative_python_run_dir} -p vertex_ai.pipelines.reporting_preparation.execution
EOT
working_dir = self.triggers.working_dir
}
}
10 changes: 5 additions & 5 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "marketing-data-engine"
version = "0.1.0"
version = "1.0.0"
description = "Marketing Data Engine"
authors = ["Marketing Analytics Solutions Architects <ma-se@google.com>"]
license = "Apache 2.0"
Expand All @@ -12,7 +12,7 @@ python = ">=3.8,<3.11"
google-cloud-aiplatform = "1.22.0"
google-cloud = "^0.34.0"
jinja2 = ">=3.0.1,<4.0.0"
pip = "22.3.1"
pip = "23.3"
invoke = "2.2.0"
## pyinvoke = "1.0.4"
pre-commit = ">=2.14.1,<3.0.0"
Expand All @@ -29,7 +29,7 @@ kfp = "2.0.0-rc.2"
kfp-server-api = "2.0.0-rc.1"
#kfp-server-api = "2.0.0.a6"
#kfp-server-api = "2.0.0b1"
urllib3 = "1.26.15"
urllib3 = "1.26.18"
toml = "0.10.2"
docker = "^6.0.1"
db-dtypes = "1.2.0"
Expand All @@ -42,7 +42,7 @@ ma-components = {path = "python/base_component_image/", develop = true}
google-cloud-pubsub = "2.15.0"
google-analytics-admin = "0.17.0"
google-analytics-data = "^0.17.1"
pyarrow = "15.0.0"
pyarrow = "15.0.2"

[tool.poetry.group.component_vertex.dependencies]
google-cloud-aiplatform = "1.22.0"
Expand All @@ -61,7 +61,7 @@ pytest-cov = "^4.0.0"
pytest-xdist = "^3.0.2"

[tool.poetry.group.dev.dependencies]
pip = "22.3.1"
pip = "23.3"
invoke = "2.2.0"
pre-commit = ">=2.14.1,<3.0.0"
black = "22.10.0"
Expand Down

0 comments on commit a57a8d0

Please sign in to comment.