From 2a6b0a369296698f79d75e93007e4c7319f3523c Mon Sep 17 00:00:00 2001 From: Mark Date: Tue, 3 Aug 2021 11:56:09 -0700 Subject: [PATCH] =?UTF-8?q?feat:=20add=20support=20for=20export=5Fevaluate?= =?UTF-8?q?d=5Fdata=5Fitems=5Fconfig=20in=20AutoMLTab=E2=80=A6=20(#583)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add support for export_evaluated_data_items_config in AutoMLTabularTrainingJob --- google/cloud/aiplatform/training_jobs.py | 63 ++++++++++ .../test_automl_tabular_training_jobs.py | 108 ++++++++++++++++++ 2 files changed, 171 insertions(+) diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 05a9a3aeb3..8e89509246 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -2705,6 +2705,9 @@ def run( budget_milli_node_hours: int = 1000, model_display_name: Optional[str] = None, disable_early_stopping: bool = False, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, sync: bool = True, ) -> models.Model: """Runs the training job and returns a model. @@ -2777,6 +2780,27 @@ def run( that training might stop before the entire training budget has been used, if further training does no longer brings significant improvement to the model. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2806,6 +2830,9 @@ def run( budget_milli_node_hours=budget_milli_node_hours, model_display_name=model_display_name, disable_early_stopping=disable_early_stopping, + export_evaluated_data_items=export_evaluated_data_items, + export_evaluated_data_items_bigquery_destination_uri=export_evaluated_data_items_bigquery_destination_uri, + export_evaluated_data_items_override_destination=export_evaluated_data_items_override_destination, sync=sync, ) @@ -2822,6 +2849,9 @@ def _run( budget_milli_node_hours: int = 1000, model_display_name: Optional[str] = None, disable_early_stopping: bool = False, + export_evaluated_data_items: bool = False, + export_evaluated_data_items_bigquery_destination_uri: Optional[str] = None, + export_evaluated_data_items_override_destination: bool = False, sync: bool = True, ) -> models.Model: """Runs the training job and returns a model. @@ -2894,6 +2924,27 @@ def _run( that training might stop before the entire training budget has been used, if further training does no longer brings significant improvement to the model. + export_evaluated_data_items (bool): + Whether to export the test set predictions to a BigQuery table. + If False, then the export is not performed. + export_evaluated_data_items_bigquery_destination_uri (string): + Optional. URI of desired destination BigQuery table for exported test set predictions. + + Expected format: + ``bq://::
`` + + If not specified, then results are exported to the following auto-created BigQuery + table: + ``:export_evaluated_examples__.evaluated_examples`` + + Applies only if [export_evaluated_data_items] is True. + export_evaluated_data_items_override_destination (bool): + Whether to override the contents of [export_evaluated_data_items_bigquery_destination_uri], + if the table exists, for exported test set predictions. If False, and the + table exists, then the training job will fail. + + Applies only if [export_evaluated_data_items] is True and + [export_evaluated_data_items_bigquery_destination_uri] is specified. sync (bool): Whether to execute this method synchronously. If False, this method will be executed in concurrent Future and any downstream object will @@ -2940,6 +2991,18 @@ def _run( "optimizationObjectivePrecisionValue": self._optimization_objective_precision_value, } + final_export_eval_bq_uri = export_evaluated_data_items_bigquery_destination_uri + if final_export_eval_bq_uri and not final_export_eval_bq_uri.startswith( + "bq://" + ): + final_export_eval_bq_uri = f"bq://{final_export_eval_bq_uri}" + + if export_evaluated_data_items: + training_task_inputs_dict["exportEvaluatedDataItemsConfig"] = { + "destinationBigqueryUri": final_export_eval_bq_uri, + "overrideExistingTable": export_evaluated_data_items_override_destination, + } + if self._additional_experiments: training_task_inputs_dict[ "additionalExperiments" diff --git a/tests/unit/aiplatform/test_automl_tabular_training_jobs.py b/tests/unit/aiplatform/test_automl_tabular_training_jobs.py index 78a99ee6e3..02ddad688b 100644 --- a/tests/unit/aiplatform/test_automl_tabular_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_tabular_training_jobs.py @@ -79,6 +79,11 @@ _TEST_TRAINING_DISABLE_EARLY_STOPPING = True _TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME = "minimize-log-loss" _TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE = "classification" +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS = True +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI = ( + "bq://path.to.table" +) +_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION = False _TEST_ADDITIONAL_EXPERIMENTS = ["exp1", "exp2"] _TEST_TRAINING_TASK_INPUTS_DICT = { # required inputs @@ -117,6 +122,16 @@ }, struct_pb2.Value(), ) +_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS = json_format.ParseDict( + { + **_TEST_TRAINING_TASK_INPUTS_DICT, + "exportEvaluatedDataItemsConfig": { + "destinationBigqueryUri": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + "overrideExistingTable": _TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + }, + }, + struct_pb2.Value(), +) _TEST_DATASET_NAME = "test-dataset-name" @@ -366,6 +381,99 @@ def test_run_call_pipeline_service_create( assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_export_eval_data_items( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + mock_dataset_tabular, + mock_model_service_get, + sync, + ): + aiplatform.init( + project=_TEST_PROJECT, + staging_bucket=_TEST_BUCKET_NAME, + encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME, + ) + + job = training_jobs.AutoMLTabularTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + optimization_objective_recall_value=None, + optimization_objective_precision_value=None, + ) + + model_from_job = job.run( + dataset=mock_dataset_tabular, + target_column=_TEST_TRAINING_TARGET_COLUMN, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction_split=_TEST_TEST_FRACTION_SPLIT, + predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING, + export_evaluated_data_items=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS, + export_evaluated_data_items_bigquery_destination_uri=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_BIGQUERY_DESTINATION_URI, + export_evaluated_data_items_override_destination=_TEST_TRAINING_EXPORT_EVALUATED_DATA_ITEMS_OVERRIDE_DESTINATION, + sync=sync, + ) + + job.wait_for_resource_creation() + + assert job.resource_name == _TEST_PIPELINE_RESOURCE_NAME + + if not sync: + model_from_job.wait() + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + true_managed_model = gca_model.Model( + display_name=_TEST_MODEL_DISPLAY_NAME, + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, + ) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + predefined_split=gca_training_pipeline.PredefinedSplit( + key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME + ), + dataset_id=mock_dataset_tabular.name, + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.automl_tabular, + training_task_inputs=_TEST_TRAINING_TASK_INPUTS_WITH_EXPORT_EVAL_DATA_ITEMS, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + encryption_spec=_TEST_DEFAULT_ENCRYPTION_SPEC, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + assert job._gca_resource is mock_pipeline_service_get.return_value + + mock_model_service_get.assert_called_once_with(name=_TEST_MODEL_NAME) + + assert model_from_job._gca_resource is mock_model_service_get.return_value + + assert job.get_model()._gca_resource is mock_model_service_get.return_value + + assert not job.has_failed + + assert job.state == gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED + @pytest.mark.usefixtures("mock_pipeline_service_get") @pytest.mark.parametrize("sync", [True, False]) def test_run_call_pipeline_if_no_model_display_name(