Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for export_evaluated_data_items_config in AutoMLTab… #583

Merged
merged 2 commits into from Aug 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
63 changes: 63 additions & 0 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -2705,6 +2705,9 @@ def run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
disable_early_stopping: bool = False,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -2777,6 +2780,27 @@ def run(
that training might stop before the entire training budget has been
used, if further training does no longer brings significant improvement
to the model.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.

Expected format:
``bq://<project_id>:<dataset_id>:<table>``

If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``

Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.

Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2806,6 +2830,9 @@ def run(
budget_milli_node_hours=budget_milli_node_hours,
model_display_name=model_display_name,
disable_early_stopping=disable_early_stopping,
export_evaluated_data_items=export_evaluated_data_items,
export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri,
export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination,
sync=sync,
)

Expand All @@ -2822,6 +2849,9 @@ def _run(
budget_milli_node_hours: int = 1000,
model_display_name: Optional[str] = None,
disable_early_stopping: bool = False,
export_evaluated_data_items: bool = False,
export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None,
export_evaluated_data_items_override_destination: bool = False,
sync: bool = True,
) -> models.Model:
"""Runs the training job and returns a model.
Expand Down Expand Up @@ -2894,6 +2924,27 @@ def _run(
that training might stop before the entire training budget has been
used, if further training does no longer brings significant improvement
to the model.
export_evaluated_data_items (bool):
Whether to export the test set predictions to a BigQuery table.
If False, then the export is not performed.
export_evaluated_data_items_bigquery_destination_uri (string):
Optional. URI of desired destination BigQuery table for exported test set predictions.

Expected format:
``bq://<project_id>:<dataset_id>:<table>``

If not specified, then results are exported to the following auto-created BigQuery
table:
``<project_id>:export_evaluated_examples_<model_name>_<yyyy_MM_dd'T'HH_mm_ss_SSS'Z'>.evaluated_examples``

Applies only if [export_evaluated_data_items] is True.
export_evaluated_data_items_override_destination (bool):
Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri],
if the table exists, for exported test set predictions. If False, and the
table exists, then the training job will fail.

Applies only if [export_evaluated_data_items] is True and
[export_evaluated_data_items_bigquery_destination_uri] is specified.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
Expand Down Expand Up @@ -2940,6 +2991,18 @@ def _run(
"optimizationObjectivePrecisionValue": self._optimization_objective_precision_value,
}

final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri
if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith(
"bq://"
):
final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}"

if export_evaluated_data_items:
training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = {
"destinationBigqueryUri": final_export_eval_bq_uri,
"overrideExistingTable": export_evaluated_data_items_override_destination,
}

if self._additional_experiments:
training_task_inputs_dict[
"additionalExperiments"
Expand Down
108 changes: 108 additions & 0 deletions tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Expand Up @@ -79,6 +79,11 @@
_TEST_TRAINING_DISABLE_EARLY_STOPPING = True
_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-log-loss"
_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE = "classification"
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS = True
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI = (
"bq://path.to.table"
)
_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION = False
_TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"]
_TEST_TRAINING_TASK_INPUTS_DICT = {
# required inputs
Expand Down Expand Up @@ -117,6 +122,16 @@
},
struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS = json_format.ParseDict(
{
**_TEST_TRAINING_TASK_INPUTS_DICT,
"exportEvaluatedDataItemsConfig": {
"destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
"overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
},
},
struct_pb2.Value(),
)

_TEST_DATASET_NAME = "test-dataset-name"

Expand Down Expand Up @@ -366,6 +381,99 @@ def test_run_call_pipeline_service_create(

assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED

@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_service_create_with_export_eval_data_items(
self,
mock_pipeline_service_create,
mock_pipeline_service_get,
mock_dataset_tabular,
mock_model_service_get,
sync,
):
aiplatform.init(
project=_TEST_PROJECT,
staging_bucket=_TEST_BUCKET_NAME,
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
)

model_from_job = job.run(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
model_display_name=_TEST_MODEL_DISPLAY_NAME,
training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction_split=_TEST_TEST_FRACTION_SPLIT,
predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME,
weight_column=_TEST_TRAINING_WEIGHT_COLUMN,
budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS,
disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING,
export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS,
export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI,
export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION,
sync=sync,
)

job.wait_for_resource_creation()

assert job.resource_name == _TEST_PIPELINE_RESOURCE_NAME

if not sync:
model_from_job.wait()

true_fraction_split = gca_training_pipeline.FractionSplit(
training_fraction=_TEST_TRAINING_FRACTION_SPLIT,
validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT,
test_fraction=_TEST_TEST_FRACTION_SPLIT,
)

true_managed_model = gca_model.Model(
display_name=_TEST_MODEL_DISPLAY_NAME,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split,
predefined_split=gca_training_pipeline.PredefinedSplit(
key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME
),
dataset_id=mock_dataset_tabular.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
display_name=_TEST_DISPLAY_NAME,
training_task_definition=schema.training_job.definition.automl_tabular,
training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS,
model_to_upload=true_managed_model,
input_data_config=true_input_data_config,
encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC,
)

mock_pipeline_service_create.assert_called_once_with(
parent=initializer.global_config.common_location_path(),
training_pipeline=true_training_pipeline,
)

assert job._gca_resource is mock_pipeline_service_get.return_value

mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME)

assert model_from_job._gca_resource is mock_model_service_get.return_value

assert job.get_model()._gca_resource is mock_model_service_get.return_value

assert not job.has_failed

assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED

@pytest.mark.usefixtures("mock_pipeline_service_get")
@pytest.mark.parametrize("sync", [True, False])
def test_run_call_pipeline_if_no_model_display_name(
Expand Down