diff --git a/google/cloud/aiplatform/datasets/time_series_dataset.py b/google/cloud/aiplatform/datasets/time_series_dataset.py index d5aa3dcbf2..1a5d62bb39 100644 --- a/google/cloud/aiplatform/datasets/time_series_dataset.py +++ b/google/cloud/aiplatform/datasets/time_series_dataset.py @@ -46,7 +46,7 @@ def create( encryption_spec_key_name: Optional[str] = None, sync: bool = True, ) -> "TimeSeriesDataset": - """Creates a new tabular dataset. + """Creates a new time series dataset. Args: display_name (str): diff --git a/google/cloud/aiplatform/training_jobs.py b/google/cloud/aiplatform/training_jobs.py index 99f4f088a5..1d738aa7a6 100644 --- a/google/cloud/aiplatform/training_jobs.py +++ b/google/cloud/aiplatform/training_jobs.py @@ -18,6 +18,7 @@ import datetime import time from typing import Dict, List, Optional, Sequence, Tuple, Union +import warnings import abc @@ -2524,6 +2525,7 @@ def __init__( display_name: str, optimization_prediction_type: str, optimization_objective: Optional[str] = None, + column_specs: Optional[Dict[str, str]] = None, column_transformations: Optional[Union[Dict, List[Dict]]] = None, optimization_objective_recall_value: Optional[float] = None, optimization_objective_precision_value: Optional[float] = None, @@ -2535,6 +2537,15 @@ def __init__( ): """Constructs a AutoML Tabular Training Job. + Example usage: + + job = training_jobs.AutoMLTabularTrainingJob( + display_name="my_display_name", + optimization_prediction_type="classification", + optimization_objective="minimize-log-loss", + column_specs={"column_1": "auto", "column_2": "numeric"}, + ) + Args: display_name (str): Required. The user-defined name of this TrainingPipeline. @@ -2575,15 +2586,29 @@ def __init__( "minimize-rmse" (default) - Minimize root-mean-squared error (RMSE). "minimize-mae" - Minimize mean-absolute error (MAE). "minimize-rmsle" - Minimize root-mean-squared log error (RMSLE). - column_transformations (Optional[Union[Dict, List[Dict]]]): + column_specs (Dict[str, str]): + Optional. Alternative to column_transformations where the keys of the dict + are column names and their respective values are one of + AutoMLTabularTrainingJob.column_data_types. + When creating transformation for BigQuery Struct column, the column + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. + If an input column has no transformations on it, such a column is + ignored by the training, except for the targetColumn, which should have + no transformations defined on. + Only one of column_transformations or column_specs should be passed. + column_transformations (Union[Dict, List[Dict]]): Optional. Transformations to apply to the input columns (i.e. columns other than the targetColumn). Each transformation may produce multiple result values from the column's value, and all are used for training. When creating transformation for BigQuery Struct column, the column - should be flattened using "." as the delimiter. + should be flattened using "." as the delimiter. Only columns with no child + should have a transformation. If an input column has no transformations on it, such a column is ignored by the training, except for the targetColumn, which should have no transformations defined on. + Only one of column_transformations or column_specs should be passed. + Consider using column_specs as column_transformations will be deprecated eventually. optimization_objective_recall_value (float): Optional. Required when maximize-precision-at-recall optimizationObjective was picked, represents the recall value at which the optimization is done. @@ -2627,6 +2652,9 @@ def __init__( If set, the trained Model will be secured by this key. Overrides encryption_spec_key_name set in aiplatform.init. + + Raises: + ValueError: When both column_transforations and column_specs were passed """ super().__init__( display_name=display_name, @@ -2636,7 +2664,26 @@ def __init__( training_encryption_spec_key_name=training_encryption_spec_key_name, model_encryption_spec_key_name=model_encryption_spec_key_name, ) - self._column_transformations = column_transformations + # user populated transformations + if column_transformations is not None and column_specs is not None: + raise ValueError( + "Both column_transformations and column_specs were passed. Only one is allowed." + ) + if column_transformations is not None: + self._column_transformations = column_transformations + warnings.simplefilter("always", DeprecationWarning) + warnings.warn( + "consider using column_specs instead. column_transformations will be deprecated in the future.", + DeprecationWarning, + stacklevel=2, + ) + elif column_specs is not None: + self._column_transformations = [ + {transformation: {"column_name": column_name}} + for column_name, transformation in column_specs.items() + ] + else: + self._column_transformations = None self._optimization_objective = optimization_objective self._optimization_prediction_type = optimization_prediction_type self._optimization_objective_recall_value = optimization_objective_recall_value @@ -2859,6 +2906,7 @@ def _run( training_task_definition = schema.training_job.definition.automl_tabular + # auto-populate transformations if self._column_transformations is None: _LOGGER.info( "No column transformations provided, so now retrieving columns from dataset in order to set default column transformations." @@ -2869,7 +2917,7 @@ def _run( for column_name in dataset.column_names if column_name != target_column ] - column_transformations = [ + self._column_transformations = [ {"auto": {"column_name": column_name}} for column_name in column_names ] @@ -2877,13 +2925,11 @@ def _run( "The column transformation of type 'auto' was set for the following columns: %s." % column_names ) - else: - column_transformations = self._column_transformations training_task_inputs_dict = { # required inputs "targetColumn": target_column, - "transformations": column_transformations, + "transformations": self._column_transformations, "trainBudgetMilliNodeHours": budget_milli_node_hours, # optional inputs "weightColumnName": weight_column, @@ -2934,6 +2980,44 @@ def _add_additional_experiments(self, additional_experiments: List[str]): """ self._additional_experiments.extend(additional_experiments) + @staticmethod + def get_auto_column_specs( + dataset: datasets.TabularDataset, target_column: str, + ) -> Dict[str, str]: + """Returns a dict with all non-target columns as keys and 'auto' as values. + + Example usage: + + column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs( + dataset=my_dataset, + target_column="my_target_column", + ) + + Args: + dataset (datasets.TabularDataset): + Required. Intended dataset. + target_column(str): + Required. Intended target column. + Returns: + Dict[str, str] + Column names as keys and 'auto' as values + """ + column_names = [ + column for column in dataset.column_names if column != target_column + ] + column_specs = {column: "auto" for column in column_names} + return column_specs + + class column_data_types: + AUTO = "auto" + NUMERIC = "numeric" + CATEGORICAL = "categorical" + TIMESTAMP = "timestamp" + TEXT = "text" + REPEATED_NUMERIC = "repeated_numeric" + REPEATED_CATEGORICAL = "repeated_categorical" + REPEATED_TEXT = "repeated_text" + class AutoMLForecastingTrainingJob(_TrainingJob): _supported_training_schemas = (schema.training_job.definition.automl_forecasting,) diff --git a/tests/unit/aiplatform/test_automl_tabular_training_jobs.py b/tests/unit/aiplatform/test_automl_tabular_training_jobs.py index 761b03b5a0..413566440f 100644 --- a/tests/unit/aiplatform/test_automl_tabular_training_jobs.py +++ b/tests/unit/aiplatform/test_automl_tabular_training_jobs.py @@ -42,6 +42,14 @@ "sepal_length", "petal_length", "petal_width", + "target", +] + +_TEST_TRAINING_COLUMN_NAMES_ALTERNATIVE = [ + "apple", + "banana", + "coconut", + "target", ] _TEST_TRAINING_COLUMN_TRANSFORMATIONS = [ @@ -50,6 +58,21 @@ {"auto": {"column_name": "petal_length"}}, {"auto": {"column_name": "petal_width"}}, ] +_TEST_TRAINING_COLUMN_SPECS = { + "apple": "auto", + "banana": "auto", + "coconut": "auto", +} +_TEST_TRAINING_COLUMN_TRANSFORMATIONS_ALTERNATIVE = [ + {"auto": {"column_name": "apple"}}, + {"auto": {"column_name": "banana"}}, + {"auto": {"column_name": "coconut"}}, +] +_TEST_TRAINING_COLUMN_TRANSFORMATIONS_ALTERNATIVE_NOT_AUTO = [ + {"numeric": {"column_name": "apple"}}, + {"categorical": {"column_name": "banana"}}, + {"text": {"column_name": "coconut"}}, +] _TEST_TRAINING_TARGET_COLUMN = "target" _TEST_TRAINING_BUDGET_MILLI_NODE_HOURS = 1000 _TEST_TRAINING_WEIGHT_COLUMN = "weight" @@ -80,6 +103,20 @@ }, struct_pb2.Value(), ) +_TEST_TRAINING_TASK_INPUTS_ALTERNATIVE = json_format.ParseDict( + { + **_TEST_TRAINING_TASK_INPUTS_DICT, + "transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS_ALTERNATIVE, + }, + struct_pb2.Value(), +) +_TEST_TRAINING_TASK_INPUTS_ALTERNATIVE_NOT_AUTO = json_format.ParseDict( + { + **_TEST_TRAINING_TASK_INPUTS_DICT, + "transformations": _TEST_TRAINING_COLUMN_TRANSFORMATIONS_ALTERNATIVE_NOT_AUTO, + }, + struct_pb2.Value(), +) _TEST_DATASET_NAME = "test-dataset-name" @@ -188,6 +225,24 @@ def mock_dataset_tabular(): yield ds +@pytest.fixture +def mock_dataset_tabular_alternative(): + ds = mock.MagicMock(datasets.TabularDataset) + ds.name = _TEST_DATASET_NAME + ds._latest_future = None + ds._exception = None + ds._gca_resource = gca_dataset.Dataset( + display_name=_TEST_DATASET_DISPLAY_NAME, + metadata_schema_uri=_TEST_METADATA_SCHEMA_URI_TABULAR, + labels={}, + name=_TEST_DATASET_NAME, + metadata={}, + ) + ds.column_names = _TEST_TRAINING_COLUMN_NAMES_ALTERNATIVE + + yield ds + + @pytest.fixture def mock_dataset_nontabular(): ds = mock.MagicMock(datasets.ImageDataset) @@ -515,12 +570,198 @@ def test_run_call_pipeline_service_create_if_set_additional_experiments( training_pipeline=true_training_pipeline, ) + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_column_specs( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + mock_dataset_tabular_alternative, + mock_model_service_get, + sync, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs( + dataset=mock_dataset_tabular_alternative, + target_column=_TEST_TRAINING_TARGET_COLUMN, + ) + + assert column_specs == _TEST_TRAINING_COLUMN_SPECS + + job = training_jobs.AutoMLTabularTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE, + column_specs=column_specs, + optimization_objective_recall_value=None, + optimization_objective_precision_value=None, + ) + + model_from_job = job.run( + dataset=mock_dataset_tabular_alternative, + target_column=_TEST_TRAINING_TARGET_COLUMN, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction_split=_TEST_TEST_FRACTION_SPLIT, + predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING, + sync=sync, + ) + + if not sync: + model_from_job.wait() + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + true_managed_model = gca_model.Model(display_name=_TEST_MODEL_DISPLAY_NAME) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + predefined_split=gca_training_pipeline.PredefinedSplit( + key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME + ), + dataset_id=mock_dataset_tabular_alternative.name, + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.automl_tabular, + training_task_inputs=_TEST_TRAINING_TASK_INPUTS_ALTERNATIVE, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_call_pipeline_service_create_with_column_specs_and_transformations_raises( + self, mock_dataset_tabular_alternative, sync, + ): + aiplatform.init() + + column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs( + dataset=mock_dataset_tabular_alternative, + target_column=_TEST_TRAINING_TARGET_COLUMN, + ) + + assert column_specs == _TEST_TRAINING_COLUMN_SPECS + + with pytest.raises(ValueError): + training_jobs.AutoMLTabularTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE, + column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS, + column_specs=column_specs, + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_get_column_specs_no_target_raises( + self, mock_dataset_tabular_alternative, sync, + ): + aiplatform.init() + + with pytest.raises(TypeError): + training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs( + dataset=mock_dataset_tabular_alternative + ) + + @pytest.mark.parametrize("sync", [True, False]) + def test_run_call_pipeline_service_create_with_column_specs_not_auto( + self, + mock_pipeline_service_create, + mock_pipeline_service_get, + mock_dataset_tabular_alternative, + mock_model_service_get, + sync, + ): + aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME) + + column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs( + dataset=mock_dataset_tabular_alternative, + target_column=_TEST_TRAINING_TARGET_COLUMN, + ) + column_specs[ + _TEST_TRAINING_COLUMN_NAMES_ALTERNATIVE[0] + ] = training_jobs.AutoMLTabularTrainingJob.column_data_types.NUMERIC + column_specs[ + _TEST_TRAINING_COLUMN_NAMES_ALTERNATIVE[1] + ] = training_jobs.AutoMLTabularTrainingJob.column_data_types.CATEGORICAL + column_specs[ + _TEST_TRAINING_COLUMN_NAMES_ALTERNATIVE[2] + ] = training_jobs.AutoMLTabularTrainingJob.column_data_types.TEXT + + job = training_jobs.AutoMLTabularTrainingJob( + display_name=_TEST_DISPLAY_NAME, + optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME, + optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE, + column_specs=column_specs, + optimization_objective_recall_value=None, + optimization_objective_precision_value=None, + ) + + model_from_job = job.run( + dataset=mock_dataset_tabular_alternative, + target_column=_TEST_TRAINING_TARGET_COLUMN, + model_display_name=_TEST_MODEL_DISPLAY_NAME, + training_fraction_split=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction_split=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction_split=_TEST_TEST_FRACTION_SPLIT, + predefined_split_column_name=_TEST_PREDEFINED_SPLIT_COLUMN_NAME, + weight_column=_TEST_TRAINING_WEIGHT_COLUMN, + budget_milli_node_hours=_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS, + disable_early_stopping=_TEST_TRAINING_DISABLE_EARLY_STOPPING, + sync=sync, + ) + + if not sync: + model_from_job.wait() + + true_fraction_split = gca_training_pipeline.FractionSplit( + training_fraction=_TEST_TRAINING_FRACTION_SPLIT, + validation_fraction=_TEST_VALIDATION_FRACTION_SPLIT, + test_fraction=_TEST_TEST_FRACTION_SPLIT, + ) + + true_managed_model = gca_model.Model(display_name=_TEST_MODEL_DISPLAY_NAME) + + true_input_data_config = gca_training_pipeline.InputDataConfig( + fraction_split=true_fraction_split, + predefined_split=gca_training_pipeline.PredefinedSplit( + key=_TEST_PREDEFINED_SPLIT_COLUMN_NAME + ), + dataset_id=mock_dataset_tabular_alternative.name, + ) + + true_training_pipeline = gca_training_pipeline.TrainingPipeline( + display_name=_TEST_DISPLAY_NAME, + training_task_definition=schema.training_job.definition.automl_tabular, + training_task_inputs=_TEST_TRAINING_TASK_INPUTS_ALTERNATIVE_NOT_AUTO, + model_to_upload=true_managed_model, + input_data_config=true_input_data_config, + ) + + mock_pipeline_service_create.assert_called_once_with( + parent=initializer.global_config.common_location_path(), + training_pipeline=true_training_pipeline, + ) + @pytest.mark.usefixtures( "mock_pipeline_service_create", "mock_pipeline_service_get", "mock_model_service_get", ) @pytest.mark.parametrize("sync", [True, False]) + # Also acts as a custom column_transformations test as it should not error during first call def test_run_called_twice_raises(self, mock_dataset_tabular, sync): aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)