Skip to content

Commit

Permalink
feat: support additional_experiments for AutoML Tables and AutoML For…
Browse files Browse the repository at this point in the history
…ecasting (#428)
  • Loading branch information
geraint0923 committed Jun 2, 2021
1 parent fdc968f commit b4211f2
Show file tree
Hide file tree
Showing 3 changed files with 228 additions and 35 deletions.
29 changes: 29 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -2420,6 +2420,8 @@ def __init__(
optimization_objective_precision_value
)

self._additional_experiments = []

def run(
self,
dataset: datasets.TabularDataset,
Expand Down Expand Up @@ -2664,6 +2666,11 @@ def _run(
"optimizationObjectivePrecisionValue": self._optimization_objective_precision_value,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
] = self._additional_experiments

if model_display_name is None:
model_display_name = self._display_name

Expand Down Expand Up @@ -2691,6 +2698,14 @@ def _model_upload_fail_string(self) -> str:
"Model."
)

def _add_additional_experiments(self, additional_experiments: List[str]):
"""Add experiment flags to the training job.
Args:
additional_experiments (List[str]):
Experiment flags that can enable some experimental training features.
"""
self._additional_experiments.extend(additional_experiments)


class AutoMLForecastingTrainingJob(_TrainingJob):
_supported_training_schemas = (schema.training_job.definition.automl_forecasting,)
Expand Down Expand Up @@ -2746,6 +2761,7 @@ def __init__(
)
self._column_transformations = column_transformations
self._optimization_objective = optimization_objective
self._additional_experiments = []

def run(
self,
Expand Down Expand Up @@ -3119,6 +3135,11 @@ def _run(
"overrideExistingTable": export_evaluated_data_items_override_destination,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
] = self._additional_experiments

if model_display_name is None:
model_display_name = self._display_name

Expand All @@ -3143,6 +3164,14 @@ def _model_upload_fail_string(self) -> str:
"Model."
)

def _add_additional_experiments(self, additional_experiments: List[str]):
"""Add experiment flags to the training job.
Args:
additional_experiments (List[str]):
Experiment flags that can enable some experimental training features.
"""
self._additional_experiments.extend(additional_experiments)


class AutoMLImageTrainingJob(_TrainingJob):
_supported_training_schemas = (
Expand Down
127 changes: 103 additions & 24 deletions tests/unit/aiplatform/test_automl_forecasting_training_jobs.py
Expand Up @@ -62,32 +62,40 @@
_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS = 1000
_TEST_TRAINING_WEIGHT_COLUMN = "weight"
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-rmse"
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"timeColumn": _TEST_TRAINING_TIME_COLUMN,
"timeSeriesIdentifierColumn": _TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
"timeSeriesAttributeColumns": _TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
"unavailableAtForecastColumns": _TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
"availableAtForecastColumns": _TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
"forecastHorizon": _TEST_TRAINING_FORECAST_HORIZON,
"dataGranularity": {
"unit": _TEST_TRAINING_DATA_GRANULARITY_UNIT,
"quantity": _TEST_TRAINING_DATA_GRANULARITY_COUNT,
},
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumn": _TEST_TRAINING_WEIGHT_COLUMN,
"contextWindow": _TEST_TRAINING_CONTEXT_WINDOW,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
"quantiles": _TEST_TRAINING_QUANTILES,
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"timeColumn": _TEST_TRAINING_TIME_COLUMN,
"timeSeriesIdentifierColumn": _TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
"timeSeriesAttributeColumns": _TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
"unavailableAtForecastColumns": _TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
"availableAtForecastColumns": _TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
"forecastHorizon": _TEST_TRAINING_FORECAST_HORIZON,
"dataGranularity": {
"unit": _TEST_TRAINING_DATA_GRANULARITY_UNIT,
"quantity": _TEST_TRAINING_DATA_GRANULARITY_COUNT,
},
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumn": _TEST_TRAINING_WEIGHT_COLUMN,
"contextWindow": _TEST_TRAINING_CONTEXT_WINDOW,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
"quantiles": _TEST_TRAINING_QUANTILES,
"validationOptions": _TEST_TRAINING_VALIDATION_OPTIONS,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
**_TEST_TRAINING_TASK_INPUTS_DICT,
"additionalExperiments": _TEST_ADDITIONAL_EXPERIMENTS,
},
struct_pb2.Value(),
)
Expand Down Expand Up @@ -359,6 +367,77 @@ def test_run_call_pipeline_if_no_model_display_name(
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_set_additional_experiments(
self,
mock_pipeline_service_create,
mock_dataset_time_series,
mock_model_service_get,
sync,
):
aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)

job = AutoMLForecastingTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
)

job._add_additional_experiments(_TEST_ADDITIONAL_EXPERIMENTS)

model_from_job = job.run(
dataset=mock_dataset_time_series,
target_column=_TEST_TRAINING_TARGET_COLUMN,
time_column=_TEST_TRAINING_TIME_COLUMN,
time_series_identifier_column=_TEST_TRAINING_TIME_SERIES_IDENTIFIER_COLUMN,
unavailable_at_forecast_columns=_TEST_TRAINING_UNAVAILABLE_AT_FORECAST_COLUMNS,
available_at_forecast_columns=_TEST_TRAINING_AVAILABLE_AT_FORECAST_COLUMNS,
forecast_horizon=_TEST_TRAINING_FORECAST_HORIZON,
data_granularity_unit=_TEST_TRAINING_DATA_GRANULARITY_UNIT,
data_granularity_count=_TEST_TRAINING_DATA_GRANULARITY_COUNT,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
time_series_attribute_columns=_TEST_TRAINING_TIME_SERIES_ATTRIBUTE_COLUMNS,
context_window=_TEST_TRAINING_CONTEXT_WINDOW,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
quantiles=_TEST_TRAINING_QUANTILES,
validation_options=_TEST_TRAINING_VALIDATION_OPTIONS,
sync=sync,
)

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

# Test that if defaults to the job display name
true_managed_model = gca_model.Model(display_name=_TEST_DISPLAY_NAME)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
dataset_id=mock_dataset_time_series.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_forecasting,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures(
"mock_pipeline_service_create",
"mock_pipeline_service_get",
Expand Down
107 changes: 96 additions & 11 deletions tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Expand Up @@ -56,19 +56,27 @@
_TEST_TRAINING_DISABLE_EARLY_STOPPING = True
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-log-loss"
_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE = "classification"
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumnName": _TEST_TRAINING_WEIGHT_COLUMN,
"disableEarlyStopping": _TEST_TRAINING_DISABLE_EARLY_STOPPING,
"predictionType": _TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
"optimizationObjectiveRecallValue": None,
"optimizationObjectivePrecisionValue": None,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
# required inputs
"targetColumn": _TEST_TRAINING_TARGET_COLUMN,
"transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS,
"trainBudgetMilliNodeHours": _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
# optional inputs
"weightColumnName": _TEST_TRAINING_WEIGHT_COLUMN,
"disableEarlyStopping": _TEST_TRAINING_DISABLE_EARLY_STOPPING,
"predictionType": _TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
"optimizationObjective": _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
"optimizationObjectiveRecallValue": None,
"optimizationObjectivePrecisionValue": None,
**_TEST_TRAINING_TASK_INPUTS_DICT,
"additionalExperiments": _TEST_ADDITIONAL_EXPERIMENTS,
},
struct_pb2.Value(),
)
Expand Down Expand Up @@ -430,6 +438,83 @@ def test_run_call_pipeline_service_create_if_no_column_transformations(
training_pipeline=true_training_pipeline,
)

@pytest.mark.parametrize("sync", [True, False])
# This test checks that default transformations are used if no columns transformations are provided
def test_run_call_pipeline_service_create_if_set_additional_experiments(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
mock_dataset_tabular,
mock_model_service_get,
sync,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_BUCKET_NAME,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=None,
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
)

job._add_additional_experiments(_TEST_ADDITIONAL_EXPERIMENTS)

model_from_job = job.run(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
model_display_name=_TEST_MODEL_DISPLAY_NAME,
training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction_split=_TEST_TEST_FRACTION_SPLIT,
predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
sync=sync,
)

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

true_managed_model = gca_model.Model(
display_name=_TEST_MODEL_DISPLAY_NAME,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
predefined_split=gca_training_pipeline.PredefinedSplit(
key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME
),
dataset_id=mock_dataset_tabular.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

@pytest.mark.usefixtures(
"mock_pipeline_service_create",
"mock_pipeline_service_get",
Expand Down

0 comments on commit b4211f2

Please sign in to comment.