Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: column specs for tabular transformation #466

Merged
merged 23 commits into from Jul 15, 2021
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/aiplatform/__init__.py
Expand Up @@ -46,6 +46,7 @@
AutoMLTextTrainingJob,
AutoMLVideoTrainingJob,
)
from google.cloud.aiplatform.column import data_types

"""
Usage:
Expand Down Expand Up @@ -91,4 +92,5 @@
"TextDataset",
"TimeSeriesDataset",
"VideoDataset",
"data_types",
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
)
29 changes: 29 additions & 0 deletions google/cloud/aiplatform/column.py
@@ -0,0 +1,29 @@
# -*- coding: utf-8 -*-

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""Vertex AI tabular data types"""


class data_types:
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
AUTO = "auto"
NUMERIC = "numeric"
CATEGORICAL = "categorical"
TIMESTAMP = "timestamp"
TEXT = "text"
REPEATED_NUMERIC = "repeated_numeric"
REPEATED_CATEGORICAL = "repeated_categorical"
REPEATED_TEXT = "repeated_text"
3 changes: 2 additions & 1 deletion google/cloud/aiplatform/datasets/time_series_dataset.py
Expand Up @@ -26,6 +26,7 @@
from google.cloud.aiplatform import utils


# TODO: extend tabular dataset
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
class TimeSeriesDataset(datasets._Dataset):
"""Managed time series dataset resource for Vertex AI"""

Expand All @@ -46,7 +47,7 @@ def create(
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
) -> "TimeSeriesDataset":
"""Creates a new tabular dataset.
"""Creates a new time series dataset.

Args:
display_name (str):
Expand Down
95 changes: 82 additions & 13 deletions google/cloud/aiplatform/training_jobs.py
Expand Up @@ -697,7 +697,9 @@ def _get_model(self) -> Optional[models.Model]:
)

return models.Model(
fields.id, project=fields.project, location=fields.location,
fields.id,
project=fields.project,
location=fields.location,
)

def _wait_callback(self):
Expand Down Expand Up @@ -1161,12 +1163,14 @@ def _prepare_and_validate_run(
model_display_name = model_display_name or self._display_name + "-model"

# validates args and will raise
worker_pool_specs = worker_spec_utils._DistributedTrainingSpec.chief_worker_pool(
replica_count=replica_count,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
).pool_specs
worker_pool_specs = (
worker_spec_utils._DistributedTrainingSpec.chief_worker_pool(
replica_count=replica_count,
machine_type=machine_type,
accelerator_count=accelerator_count,
accelerator_type=accelerator_type,
).pool_specs
)

managed_model = self._managed_model
if model_display_name:
Expand Down Expand Up @@ -2524,6 +2528,7 @@ def __init__(
display_name: str,
optimization_prediction_type: str,
optimization_objective: Optional[str] = None,
column_specs: Optional[Dict[str, str]] = None,
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
column_transformations: Optional[Union[Dict, List[Dict]]] = None,
optimization_objective_recall_value: Optional[float] = None,
optimization_objective_precision_value: Optional[float] = None,
Expand Down Expand Up @@ -2575,6 +2580,15 @@ def __init__(
"minimize-rmse" (default) - Minimize root-mean-squared error (RMSE).
"minimize-mae" - Minimize mean-absolute error (MAE).
"minimize-rmsle" - Minimize root-mean-squared log error (RMSLE).
column_specs (Optional[Dict[str, str]]):
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
Optional. Transformations to apply to the input columns (i.e. columns other
than the targetColumn). Each transformation may produce multiple
result values from the column's value, and all are used for training.
When creating transformation for BigQuery Struct column, the column
should be flattened using "." as the delimiter.
If an input column has no transformations on it, such a column is
ignored by the training, except for the targetColumn, which should have
no transformations defined on.
column_transformations (Optional[Union[Dict, List[Dict]]]):
Optional. Transformations to apply to the input columns (i.e. columns other
than the targetColumn). Each transformation may produce multiple
Expand All @@ -2584,6 +2598,8 @@ def __init__(
If an input column has no transformations on it, such a column is
ignored by the training, except for the targetColumn, which should have
no transformations defined on.
Only one of column_transformations or column_specs should be passed.
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
Only one of column_transformations or column_specs should be passed.
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
optimization_objective_recall_value (float):
Optional. Required when maximize-precision-at-recall optimizationObjective was
picked, represents the recall value at which the optimization is done.
Expand Down Expand Up @@ -2637,6 +2653,7 @@ def __init__(
model_encryption_spec_key_name=model_encryption_spec_key_name,
)
self._column_transformations = column_transformations
self._column_specs = column_specs
self._optimization_objective = optimization_objective
self._optimization_prediction_type = optimization_prediction_type
self._optimization_objective_recall_value = optimization_objective_recall_value
Expand Down Expand Up @@ -2854,12 +2871,37 @@ def _run(

Returns:
model: The trained Vertex AI Model resource or None if training did not
produce a Vertex AI Model.
produce an Vertex AI Model.
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
Raises:
ValueError: When column doesn't exist in dataset.
ValueError: When target column is in transformations.
"""

training_task_definition = schema.training_job.definition.automl_tabular
column_transformations = None

if self._column_transformations is None:
# user populated transformations
if self._column_transformations is not None and self._column_specs is not None:
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info(
"column_transformations and column_specs were both passed. column_transformations was used."
)
if self._column_transformations is not None:
column_transformations = self._column_transformations
column_names = dataset.column_names
for transformation in column_transformations:
for data_type in transformation:
column = transformation[data_type]
if column["column_name"] not in column_names:
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError(f"'{column}' is not in the dataset.")
if column["column_name"] is target_column:
raise ValueError("Target column is in transformations.")
elif self._column_specs is not None:
column_transformations = [
{self._column_specs[column]: {"column_name": column}}
for column in self._column_specs
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
]
# auto-populate transformations
if column_transformations is None:
_LOGGER.info(
"No column transformations provided, so now retrieving columns from dataset in order to set default column transformations."
)
Expand All @@ -2877,8 +2919,6 @@ def _run(
"The column transformation of type 'auto' was set for the following columns: %s."
% column_names
)
else:
column_transformations = self._column_transformations

training_task_inputs_dict = {
# required inputs
Expand Down Expand Up @@ -2934,7 +2974,34 @@ def _add_additional_experiments(self, additional_experiments: List[str]):
"""
self._additional_experiments.extend(additional_experiments)

def get_auto_column_specs(
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
dataset: datasets.TabularDataset,
target_column: str,
) -> Dict[str, str]:
"""Returns a dict with all non-target columns as keys and 'auto' as values.
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
Args:
dataset (datasets.TabularDataset):
Required. Intended dataset.
target_column(str):
Required. Intended target column.
Returns:
Dict[str, str]
Column names as keys and 'auto' as values

Raises:
RuntimeError: When no valid source is found.
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
ValueError: When target_column is not in dataset
"""
if target_column not in dataset.column_names:
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
raise ValueError("Target column not in dataset.")
column_names = [
column for column in dataset.column_names if column != target_column
]
column_specs = {column: "auto" for column in column_names}
return column_specs


# TODO: add tabular sugar to forecasting
class AutoMLForecastingTrainingJob(_TrainingJob):
_supported_training_schemas = (schema.training_job.definition.automl_forecasting,)

Expand Down Expand Up @@ -4729,8 +4796,10 @@ def __init__(
schema.training_job.definition.automl_text_classification
)

training_task_inputs_dict = training_job_inputs.AutoMlTextClassificationInputs(
multi_label=multi_label
training_task_inputs_dict = (
training_job_inputs.AutoMlTextClassificationInputs(
multi_label=multi_label
)
)
elif prediction_type == "extraction":
training_task_definition = (
Expand Down
72 changes: 54 additions & 18 deletions tests/unit/aiplatform/test_automl_tabular_training_jobs.py
Expand Up @@ -42,6 +42,7 @@
"sepal_length",
"petal_length",
"petal_width",
"target",
]

_TEST_TRAINING_COLUMN_TRANSFORMATIONS = [
Expand All @@ -50,6 +51,12 @@
{"auto": {"column_name": "petal_length"}},
{"auto": {"column_name": "petal_width"}},
]
_TEST_TRAINING_COLUMN_SPECS = {
"sepal_width": "auto",
"sepal_length": "auto",
"petal_width": "auto",
"petal_length": "auto",
}
_TEST_TRAINING_TARGET_COLUMN = "target"
_TEST_TRAINING_BUDGET_MILLI_NODE_HOURS = 1000
_TEST_TRAINING_WEIGHT_COLUMN = "weight"
Expand All @@ -71,7 +78,8 @@
"optimizationObjectivePrecisionValue": None,
}
_TEST_TRAINING_TASK_INPUTS = json_format.ParseDict(
_TEST_TRAINING_TASK_INPUTS_DICT, struct_pb2.Value(),
_TEST_TRAINING_TASK_INPUTS_DICT,
struct_pb2.Value(),
)
_TEST_TRAINING_TASK_INPUTS_WITH_ADDITIONAL_EXPERIMENTS = json_format.ParseDict(
{
Expand Down Expand Up @@ -119,10 +127,12 @@ def mock_pipeline_service_create():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "create_training_pipeline"
) as mock_create_training_pipeline:
mock_create_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME),
mock_create_training_pipeline.return_value = (
gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME),
)
)
yield mock_create_training_pipeline

Expand All @@ -132,10 +142,12 @@ def mock_pipeline_service_get():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "get_training_pipeline"
) as mock_get_training_pipeline:
mock_get_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME),
mock_get_training_pipeline.return_value = (
gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_SUCCEEDED,
model_to_upload=gca_model.Model(name=_TEST_MODEL_NAME),
)
)
yield mock_get_training_pipeline

Expand All @@ -145,17 +157,21 @@ def mock_pipeline_service_create_and_get_with_fail():
with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "create_training_pipeline"
) as mock_create_training_pipeline:
mock_create_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING,
mock_create_training_pipeline.return_value = (
gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_RUNNING,
)
)

with mock.patch.object(
pipeline_service_client.PipelineServiceClient, "get_training_pipeline"
) as mock_get_training_pipeline:
mock_get_training_pipeline.return_value = gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_FAILED,
mock_get_training_pipeline.return_value = (
gca_training_pipeline.TrainingPipeline(
name=_TEST_PIPELINE_RESOURCE_NAME,
state=gca_pipeline_state.PipelineState.PIPELINE_STATE_FAILED,
)
)

yield mock_create_training_pipeline, mock_get_training_pipeline
Expand Down Expand Up @@ -227,11 +243,23 @@ def test_run_call_pipeline_service_create(
encryption_spec_key_name=_TEST_DEFAULT_ENCRYPTION_KEY_NAME,
)

column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
)

assert column_specs == _TEST_TRAINING_COLUMN_SPECS
column_specs[
_TEST_TRAINING_COLUMN_NAMES[0]
] = aiplatform.column.data_types.NUMERIC
assert column_specs[_TEST_TRAINING_COLUMN_NAMES[0]] == "numeric"
column_specs[_TEST_TRAINING_COLUMN_NAMES[0]] = aiplatform.column.data_types.AUTO
sirtorry marked this conversation as resolved.
Show resolved Hide resolved

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
column_specs=column_specs,
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
)
Expand Down Expand Up @@ -309,11 +337,18 @@ def test_run_call_pipeline_if_no_model_display_name(
):
aiplatform.init(project=_TEST_PROJECT, staging_bucket=_TEST_BUCKET_NAME)

column_specs = training_jobs.AutoMLTabularTrainingJob.get_auto_column_specs(
dataset=mock_dataset_tabular,
target_column=_TEST_TRAINING_TARGET_COLUMN,
)

assert column_specs == _TEST_TRAINING_COLUMN_SPECS

job = training_jobs.AutoMLTabularTrainingJob(
display_name=_TEST_DISPLAY_NAME,
optimization_objective=_TEST_TRAINING_OPTIMIZATION_OBJECTIVE_NAME,
optimization_prediction_type=_TEST_TRAINING_OPTIMIZATION_PREDICTION_TYPE,
column_transformations=_TEST_TRAINING_COLUMN_TRANSFORMATIONS,
column_specs=column_specs,
sirtorry marked this conversation as resolved.
Show resolved Hide resolved
optimization_objective_recall_value=None,
optimization_objective_precision_value=None,
training_encryption_spec_key_name=_TEST_PIPELINE_ENCRYPTION_KEY_NAME,
Expand Down Expand Up @@ -346,7 +381,8 @@ def test_run_call_pipeline_if_no_model_display_name(
)

true_input_data_config = gca_training_pipeline.InputDataConfig(
fraction_split=true_fraction_split, dataset_id=mock_dataset_tabular.name,
fraction_split=true_fraction_split,
dataset_id=mock_dataset_tabular.name,
)

true_training_pipeline = gca_training_pipeline.TrainingPipeline(
Expand Down