Skip to content

Commit

Permalink
feat: add support for export_evaluated_data_items_config in AutoMLTab… (
Browse files Browse the repository at this point in the history
#583)

add support for export_evaluated_data_items_config in AutoMLTabularTrainingJob
  • Loading branch information
geraint0923 committed Aug 3, 2021
1 parent 193ef7d commit 2a6b0a3
Show file tree
Hide file tree
Showing 2 changed files with 171 additions and 0 deletions.
63 changes: 63 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -2705,6 +2705,9 @@ def run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
disable_early_stopping: bool = False,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -2777,6 +2780,27 @@ def run(
that training might stop before the entire training budget has been
used, if further training does no longer brings significant improvement
to the model.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.
Expected format:
``bq://<project_id>:<dataset_id>:<table>``
If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``
Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.
Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2806,6 +2830,9 @@ def run(
budget_milli_node_hours=budget_milli_node_hours,
model_display_name=model_display_name,
disable_early_stopping=disable_early_stopping,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri,
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
sync=sync,
)

Expand All @@ -2822,6 +2849,9 @@ def _run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
disable_early_stopping: bool = False,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -2894,6 +2924,27 @@ def _run(
that training might stop before the entire training budget has been
used, if further training does no longer brings significant improvement
to the model.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.
Expected format:
``bq://<project_id>:<dataset_id>:<table>``
If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``
Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.
Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2940,6 +2991,18 @@ def _run(
"optimizationObjectivePrecisionValue": self._optimization_objective_precision_value,
}

final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri
if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith(
"bq://"
):
final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}"

if export_evaluated_data_items:
training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = {
"destinationBigqueryUri": final_export_eval_bq_uri,
"overrideExistingTable": export_evaluated_data_items_override_destination,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
Expand Down
108 changes: 108 additions & 0 deletions tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Expand Up @@ -79,6 +79,11 @@
_TEST_TRAINING_DISABLE_EARLY_STOPPING = True
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-log-loss"
_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE = "classification"
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS = True
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI = (
"bq://path.to.table"
)
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION = False
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
Expand Down Expand Up @@ -117,6 +122,16 @@
},
struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS = json_format.ParseDict(
{
**_TEST_TRAINING_TASK_INPUTS_DICT,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
},
struct_pb2.Value(),
)

_TEST_DATASET_NAME = "test-dataset-name"

Expand Down Expand Up @@ -366,6 +381,99 @@ def test_run_call_pipeline_service_create(

assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED

@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_create_with_export_eval_data_items(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
mock_dataset_tabular,
mock_model_service_get,
sync,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_BUCKET_NAME,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
)

model_from_job = job.run(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
model_display_name=_TEST_MODEL_DISPLAY_NAME,
training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction_split=_TEST_TEST_FRACTION_SPLIT,
predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
sync=sync,
)

job.wait_for_resource_creation()

assert job.resource_name == _TEST_PIPELINE_RESOURCE_NAME

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

true_managed_model = gca_model.Model(
display_name=_TEST_MODEL_DISPLAY_NAME,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
predefined_split=gca_training_pipeline.PredefinedSplit(
key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME
),
dataset_id=mock_dataset_tabular.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

assert job._gca_resource is mock_pipeline_service_get.return_value

mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME)

assert model_from_job._gca_resource is mock_model_service_get.return_value

assert job.get_model()._gca_resource is mock_model_service_get.return_value

assert not job.has_failed

assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_no_model_display_name(
Expand Down

0 comments on commit 2a6b0a3

Please sign in to comment.