From ba11c3d3cd8d3869e2deb3207a8698fa7ce284ec Mon Sep 17 00:00:00 2001 From: Morgan Du Date: Thu, 16 Dec 2021 19:40:13 -0800 Subject: [PATCH] feat: add create in Featurestore, EntityType, Feature; add create_entity_type in Featurestore; add create_feature, batch_create_features in EntityType; add ingest_from_* for bq and gcs in EntityType; add and update delete with force delete nested resources (#872) - [x] add create method in Featurestore, EntityType, Feature classes; - [x] add create_entity_type in Featurestore; - [x] add create_feature, batch_create_features in EntityType; - [x] add ingest_from_bq and ingest_from_gcs methods in EntityType class - [x] add and update delete methods with force delete nested resources - [x] add unit tests - [x] update integration test --- .../aiplatform/featurestore/entity_type.py | 757 +++++++++++++++++- .../cloud/aiplatform/featurestore/feature.py | 156 +++- .../aiplatform/featurestore/featurestore.py | 229 +++++- google/cloud/aiplatform/utils/__init__.py | 20 + .../aiplatform/utils/featurestore_utils.py | 139 +++- tests/system/aiplatform/e2e_base.py | 14 +- tests/system/aiplatform/test_featurestore.py | 223 +++++- tests/unit/aiplatform/test_featurestores.py | 564 ++++++++++++- tests/unit/aiplatform/test_utils.py | 58 ++ 9 files changed, 2124 insertions(+), 36 deletions(-) diff --git a/google/cloud/aiplatform/featurestore/entity_type.py b/google/cloud/aiplatform/featurestore/entity_type.py index f777d2ed41..9b2524e45c 100644 --- a/google/cloud/aiplatform/featurestore/entity_type.py +++ b/google/cloud/aiplatform/featurestore/entity_type.py @@ -15,13 +15,18 @@ # limitations under the License. # -from typing import Dict, List, Optional, Sequence, Tuple +import datetime +from typing import Dict, List, Optional, Sequence, Tuple, Union from google.auth import credentials as auth_credentials from google.protobuf import field_mask_pb2 from google.cloud.aiplatform import base -from google.cloud.aiplatform.compat.types import entity_type as gca_entity_type +from google.cloud.aiplatform.compat.types import ( + entity_type as gca_entity_type, + featurestore_service as gca_featurestore_service, + io as gca_io, +) from google.cloud.aiplatform import featurestore from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import featurestore_utils @@ -81,7 +86,8 @@ def __init__( Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id/entityTypes/my_entity_type_id" or "my_entity_type_id" when project and location are initialized or passed, with featurestore_id passed. featurestore_id (str): - Optional. Featurestore ID to retrieve entityType from, when entity_type_name is passed as entity_type ID. + Optional. Featurestore ID of an existing featurestore to retrieve entityType from, + when entity_type_name is passed as entity_type ID. project (str): Optional. Project to retrieve entityType from. If not set, project set in aiplatform.init will be used. @@ -246,7 +252,8 @@ def list( Args: featurestore_name (str): - Required. A fully-qualified featurestore resource name or a featurestore ID to list entityTypes in + Required. A fully-qualified featurestore resource name or a featurestore ID + of an existing featurestore to list entityTypes in. Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id" or "my_featurestore_id" when project and location are initialized or passed. filter (str): @@ -389,3 +396,745 @@ def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None: for feature in features: feature.wait() + + @base.optional_sync() + def delete(self, sync: bool = True, force: bool = False) -> None: + """Deletes this EntityType resource. If force is set to True, + all features in this EntityType will be deleted prior to entityType deletion. + + WARNING: This deletion is permanent. + + Args: + force (bool): + If set to true, any Features for this + EntityType will also be deleted. + (Otherwise, the request will only work + if the EntityType has no Features.) + sync (bool): + Whether to execute this deletion synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + Raises: + FailedPrecondition: If features are created in this EntityType and force = False. + """ + _LOGGER.log_action_start_against_resource("Deleting", "", self) + lro = getattr(self.api_client, self._delete_method)( + name=self.resource_name, force=force + ) + _LOGGER.log_action_started_against_resource_with_lro( + "Delete", "", self.__class__, lro + ) + lro.result() + _LOGGER.log_action_completed_against_resource("deleted.", "", self) + + @classmethod + @base.optional_sync() + def create( + cls, + entity_type_id: str, + featurestore_name: str, + description: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "EntityType": + """Creates an EntityType resource in a Featurestore. + + Example Usage: + + my_entity_type = aiplatform.EntityType.create( + entity_type_id='my_entity_type_id', + featurestore_name='projects/123/locations/us-central1/featurestores/my_featurestore_id' + ) + or + my_entity_type = aiplatform.EntityType.create( + entity_type_id='my_entity_type_id', + featurestore_name='my_featurestore_id', + ) + + Args: + entity_type_id (str): + Required. The ID to use for the EntityType, which will + become the final component of the EntityType's resource + name. + + This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + + The value must be unique within a featurestore. + featurestore_name (str): + Required. A fully-qualified featurestore resource name or a featurestore ID + of an existing featurestore to create EntityType in. + Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id" + or "my_featurestore_id" when project and location are initialized or passed. + description (str): + Optional. Description of the EntityType. + labels (Dict[str, str]): + Optional. The labels with user-defined + metadata to organize your EntityTypes. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + on and examples of labels. No more than 64 user + labels can be associated with one EntityType + (System labels are excluded)." + System reserved label keys are prefixed with + "aiplatform.googleapis.com/" and are immutable. + project (str): + Optional. Project to create EntityType in if `featurestore_name` is passed an featurestore ID. + If not set, project set in aiplatform.init will be used. + location (str): + Optional. Location to create EntityType in if `featurestore_name` is passed an featurestore ID. + If not set, location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create EntityTypes. Overrides + credentials set in aiplatform.init. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + EntityType - entity_type resource object + + """ + + featurestore_name = utils.full_resource_name( + resource_name=featurestore_name, + resource_noun=featurestore.Featurestore._resource_noun, + parse_resource_name_method=featurestore.Featurestore._parse_resource_name, + format_resource_name_method=featurestore.Featurestore._format_resource_name, + project=project, + location=location, + resource_id_validator=featurestore.Featurestore._resource_id_validator, + ) + + featurestore_name_components = featurestore.Featurestore._parse_resource_name( + featurestore_name + ) + + gapic_entity_type = gca_entity_type.EntityType() + + if labels: + utils.validate_labels(labels) + gapic_entity_type.labels = labels + + if description: + gapic_entity_type.description = description + + api_client = cls._instantiate_client( + location=featurestore_name_components["location"], credentials=credentials, + ) + + created_entity_type_lro = api_client.create_entity_type( + parent=featurestore_name, + entity_type=gapic_entity_type, + entity_type_id=entity_type_id, + metadata=request_metadata, + ) + + _LOGGER.log_create_with_lro(cls, created_entity_type_lro) + + created_entity_type = created_entity_type_lro.result() + + _LOGGER.log_create_complete(cls, created_entity_type, "entity_type") + + entity_type_obj = cls( + entity_type_name=created_entity_type.name, + project=project, + location=location, + credentials=credentials, + ) + + return entity_type_obj + + def create_feature( + self, + feature_id: str, + value_type: str, + description: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "featurestore.Feature": + """Creates a Feature resource in this EntityType. + + Example Usage: + + my_entity_type = aiplatform.EntityType( + entity_type_name='my_entity_type_id', + featurestore_id='my_featurestore_id', + ) + my_feature = my_entity_type.create_feature( + feature_id='my_feature_id', + value_type='INT64', + ) + + Args: + feature_id (str): + Required. The ID to use for the Feature, which will become + the final component of the Feature's resource name, which is immutable. + + This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + + The value must be unique within an EntityType. + value_type (str): + Required. Immutable. Type of Feature value. + One of BOOL, BOOL_ARRAY, DOUBLE, DOUBLE_ARRAY, INT64, INT64_ARRAY, STRING, STRING_ARRAY, BYTES. + description (str): + Optional. Description of the Feature. + labels (Dict[str, str]): + Optional. The labels with user-defined + metadata to organize your Features. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + on and examples of labels. No more than 64 user + labels can be associated with one Feature + (System labels are excluded)." + System reserved label keys are prefixed with + "aiplatform.googleapis.com/" and are immutable. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + featurestore.Feature - feature resource object + + """ + return featurestore.Feature.create( + feature_id=feature_id, + value_type=value_type, + entity_type_name=self.resource_name, + description=description, + labels=labels, + request_metadata=request_metadata, + sync=sync, + ) + + def _validate_and_get_create_feature_requests( + self, + feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]], + ) -> List[gca_featurestore_service.CreateFeatureRequest]: + """ Validates feature_configs and get requests for batch feature creation + + Args: + feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]): + Required. A user defined Dict containing configurations for feature creation. + + Returns: + List[gca_featurestore_service.CreateFeatureRequest] - requests for batch feature creation + """ + + requests = [] + for feature_id, feature_config in feature_configs.items(): + feature_config = featurestore_utils._FeatureConfig( + feature_id=feature_id, + value_type=feature_config.get( + "value_type", featurestore_utils._FEATURE_VALUE_TYPE_UNSPECIFIED + ), + description=feature_config.get("description", None), + labels=feature_config.get("labels", {}), + ) + create_feature_request = feature_config.get_create_feature_request() + requests.append(create_feature_request) + + return requests + + @base.optional_sync(return_input_arg="self") + def batch_create_features( + self, + feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]], + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "EntityType": + """Batch creates Feature resources in this EntityType. + + Example Usage: + + my_entity_type = aiplatform.EntityType( + entity_type_name='my_entity_type_id', + featurestore_id='my_featurestore_id', + ) + my_entity_type.batch_create_features( + feature_configs={ + "my_feature_id1": { + "value_type": "INT64", + }, + "my_feature_id2": { + "value_type": "BOOL", + }, + "my_feature_id3": { + "value_type": "STRING", + }, + } + ) + + Args: + feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]): + Required. A user defined Dict containing configurations for feature creation. + + The feature_configs Dict[str, Dict] i.e. {feature_id: feature_config} contains configuration for each creating feature: + Example: + feature_configs = { + "my_feature_id_1": feature_config_1, + "my_feature_id_2": feature_config_2, + "my_feature_id_3": feature_config_3, + } + + Each feature_config requires "value_type", and optional "description", "labels": + Example: + feature_config_1 = { + "value_type": "INT64", + } + feature_config_2 = { + "value_type": "BOOL", + "description": "my feature id 2 description" + } + feature_config_3 = { + "value_type": "STRING", + "labels": { + "my key": "my value", + } + } + + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + EntityType - entity_type resource object + """ + create_feature_requests = self._validate_and_get_create_feature_requests( + feature_configs=feature_configs + ) + + _LOGGER.log_action_start_against_resource( + "Batch creating features", "entityType", self, + ) + + batch_created_features_lro = self.api_client.batch_create_features( + parent=self.resource_name, + requests=create_feature_requests, + metadata=request_metadata, + ) + + _LOGGER.log_action_started_against_resource_with_lro( + "Batch create Features", + "entityType", + self.__class__, + batch_created_features_lro, + ) + + batch_created_features_lro.result() + + _LOGGER.log_action_completed_against_resource( + "entityType", "Batch created features", self + ) + + return self + + def _validate_and_get_import_feature_values_request( + self, + feature_ids: List[str], + feature_time: Union[str, datetime.datetime], + data_source: Union[gca_io.AvroSource, gca_io.BigQuerySource, gca_io.CsvSource], + feature_source_fields: Optional[Dict[str, str]] = None, + entity_id_field: Optional[str] = None, + disable_online_serving: Optional[bool] = None, + worker_count: Optional[int] = None, + ) -> gca_featurestore_service.ImportFeatureValuesRequest: + """Validates and get import feature values request. + Args: + feature_ids (List[str]): + Required. IDs of the Feature to import values + of. The Features must exist in the target + EntityType, or the request will fail. + feature_time (Union[str, datetime.datetime]): + Required. The feature_time can be one of: + - The source column that holds the Feature + timestamp for all Feature values in each entity. + - A single Feature timestamp for all entities + being imported. The timestamp must not have + higher than millisecond precision. + data_source (Union[gca_io.AvroSource, gca_io.BiqQuerySource, gca_io.CsvSource]): + Required. The data_source can be one of: + - AvroSource + - BiqQuerySource + - CsvSource + feature_source_fields (Dict[str, str]): + Optional. User defined dictionary to map ID of the Feature for importing values + of to the source column for getting the Feature values from. + + Specify the features whose ID and source column are not the same. + If not provided, the source column need to be the same as the Feature ID. + + Example: + + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] + + In case all features' source field and ID match: + feature_source_fields = None or {} + + In case all features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + 'my_feature_id_2': 'my_feature_id_2_source_field', + 'my_feature_id_3': 'my_feature_id_3_source_field', + } + + In case some features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + } + entity_id_field (str): + Optional. Source column that holds entity IDs. If not provided, entity + IDs are extracted from the column named ``entity_id``. + disable_online_serving (bool): + Optional. If set, data will not be imported for online + serving. This is typically used for backfilling, + where Feature generation timestamps are not in + the timestamp range needed for online serving. + worker_count (int): + Optional. Specifies the number of workers that are used + to write data to the Featurestore. Consider the + online serving capacity that you require to + achieve the desired import throughput without + interfering with online serving. The value must + be positive, and less than or equal to 100. If + not set, defaults to using 1 worker. The low + count ensures minimal impact on online serving + performance. + Returns: + gca_featurestore_service.ImportFeatureValuesRequest - request message for importing feature values + Raises: + ValueError if data_source type is not supported + ValueError if feature_time type is not supported + """ + feature_source_fields = feature_source_fields or {} + feature_specs = [ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id=feature_id, source_field=feature_source_fields.get(feature_id) + ) + for feature_id in set(feature_ids) + ] + + import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=self.resource_name, + feature_specs=feature_specs, + entity_id_field=entity_id_field, + disable_online_serving=disable_online_serving, + worker_count=worker_count, + ) + + if isinstance(data_source, gca_io.AvroSource): + import_feature_values_request.avro_source = data_source + elif isinstance(data_source, gca_io.BigQuerySource): + import_feature_values_request.bigquery_source = data_source + elif isinstance(data_source, gca_io.CsvSource): + import_feature_values_request.csv_source = data_source + else: + raise ValueError( + f"The type of `data_source` field should be: " + f"`gca_io.AvroSource`, `gca_io.BigQuerySource`, or `gca_io.CsvSource`, " + f"get {type(data_source)} instead. " + ) + + if isinstance(feature_time, str): + import_feature_values_request.feature_time_field = feature_time + elif isinstance(feature_time, datetime.datetime): + import_feature_values_request.feature_time = utils.get_timestamp_proto( + time=feature_time + ) + else: + raise ValueError( + f"The type of `feature_time` field should be: `str` or `datetime.datetime`, " + f"get {type(feature_time)} instead. " + ) + + return import_feature_values_request + + def _import_feature_values( + self, + import_feature_values_request: gca_featurestore_service.ImportFeatureValuesRequest, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + ) -> "EntityType": + """Imports Feature values into the Featurestore from a source storage. + + Args: + import_feature_values_request (gca_featurestore_service.ImportFeatureValuesRequest): + Required. Request message for importing feature values. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + + Returns: + EntityType - The entityType resource object with imported feature values. + """ + _LOGGER.log_action_start_against_resource( + "Importing", "feature values", self, + ) + + import_lro = self.api_client.import_feature_values( + request=import_feature_values_request, metadata=request_metadata, + ) + + _LOGGER.log_action_started_against_resource_with_lro( + "Import", "feature values", self.__class__, import_lro + ) + + import_lro.result() + + _LOGGER.log_action_completed_against_resource( + "feature values", "imported", self + ) + + return self + + @base.optional_sync(return_input_arg="self") + def ingest_from_bq( + self, + feature_ids: List[str], + feature_time: Union[str, datetime.datetime], + bq_source_uri: str, + feature_source_fields: Optional[Dict[str, str]] = None, + entity_id_field: Optional[str] = None, + disable_online_serving: Optional[bool] = None, + worker_count: Optional[int] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "EntityType": + """Ingest feature values from BigQuery. + + Args: + feature_ids (List[str]): + Required. IDs of the Feature to import values + of. The Features must exist in the target + EntityType, or the request will fail. + feature_time (Union[str, datetime.datetime]): + Required. The feature_time can be one of: + - The source column that holds the Feature + timestamp for all Feature values in each entity. + - A single Feature timestamp for all entities + being imported. The timestamp must not have + higher than millisecond precision. + bq_source_uri (str): + Required. BigQuery URI to the input table. + Example: + 'bq://project.dataset.table_name' + feature_source_fields (Dict[str, str]): + Optional. User defined dictionary to map ID of the Feature for importing values + of to the source column for getting the Feature values from. + + Specify the features whose ID and source column are not the same. + If not provided, the source column need to be the same as the Feature ID. + + Example: + + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] + + In case all features' source field and ID match: + feature_source_fields = None or {} + + In case all features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + 'my_feature_id_2': 'my_feature_id_2_source_field', + 'my_feature_id_3': 'my_feature_id_3_source_field', + } + + In case some features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + } + entity_id_field (str): + Optional. Source column that holds entity IDs. If not provided, entity + IDs are extracted from the column named ``entity_id``. + disable_online_serving (bool): + Optional. If set, data will not be imported for online + serving. This is typically used for backfilling, + where Feature generation timestamps are not in + the timestamp range needed for online serving. + worker_count (int): + Optional. Specifies the number of workers that are used + to write data to the Featurestore. Consider the + online serving capacity that you require to + achieve the desired import throughput without + interfering with online serving. The value must + be positive, and less than or equal to 100. If + not set, defaults to using 1 worker. The low + count ensures minimal impact on online serving + performance. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this import synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + EntityType - The entityType resource object with feature values imported. + + """ + bigquery_source = gca_io.BigQuerySource(input_uri=bq_source_uri) + + import_feature_values_request = self._validate_and_get_import_feature_values_request( + feature_ids=feature_ids, + feature_time=feature_time, + data_source=bigquery_source, + feature_source_fields=feature_source_fields, + entity_id_field=entity_id_field, + disable_online_serving=disable_online_serving, + worker_count=worker_count, + ) + + return self._import_feature_values( + import_feature_values_request=import_feature_values_request, + request_metadata=request_metadata, + ) + + @base.optional_sync(return_input_arg="self") + def ingest_from_gcs( + self, + feature_ids: List[str], + feature_time: Union[str, datetime.datetime], + gcs_source_uris: Union[str, List[str]], + gcs_source_type: str, + feature_source_fields: Optional[Dict[str, str]] = None, + entity_id_field: Optional[str] = None, + disable_online_serving: Optional[bool] = None, + worker_count: Optional[int] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "EntityType": + """Ingest feature values from GCS. + + Args: + feature_ids (List[str]): + Required. IDs of the Feature to import values + of. The Features must exist in the target + EntityType, or the request will fail. + feature_time (Union[str, datetime.datetime]): + Required. The feature_time can be one of: + - The source column that holds the Feature + timestamp for all Feature values in each entity. + - A single Feature timestamp for all entities + being imported. The timestamp must not have + higher than millisecond precision. + gcs_source_uris (Union[str, List[str]]): + Required. Google Cloud Storage URI(-s) to the + input file(s). May contain wildcards. For more + information on wildcards, see + https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames. + Example: + ["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"] + or + "gs://my_bucket/my_file.avro" + gcs_source_type (str): + Required. The type of the input file(s) provided by `gcs_source_uris`, + the value of gcs_source_type can only be either `csv`, or `avro`. + feature_source_fields (Dict[str, str]): + Optional. User defined dictionary to map ID of the Feature for importing values + of to the source column for getting the Feature values from. + + Specify the features whose ID and source column are not the same. + If not provided, the source column need to be the same as the Feature ID. + + Example: + + feature_ids = ['my_feature_id_1', 'my_feature_id_2', 'my_feature_id_3'] + + In case all features' source field and ID match: + feature_source_fields = None or {} + + In case all features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + 'my_feature_id_2': 'my_feature_id_2_source_field', + 'my_feature_id_3': 'my_feature_id_3_source_field', + } + + In case some features' source field and ID do not match: + feature_source_fields = { + 'my_feature_id_1': 'my_feature_id_1_source_field', + } + entity_id_field (str): + Optional. Source column that holds entity IDs. If not provided, entity + IDs are extracted from the column named ``entity_id``. + disable_online_serving (bool): + Optional. If set, data will not be imported for online + serving. This is typically used for backfilling, + where Feature generation timestamps are not in + the timestamp range needed for online serving. + worker_count (int): + Optional. Specifies the number of workers that are used + to write data to the Featurestore. Consider the + online serving capacity that you require to + achieve the desired import throughput without + interfering with online serving. The value must + be positive, and less than or equal to 100. If + not set, defaults to using 1 worker. The low + count ensures minimal impact on online serving + performance. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this import synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + EntityType - The entityType resource object with feature values imported. + + Raises: + ValueError if gcs_source_type is not supported. + """ + if gcs_source_type not in featurestore_utils.GCS_SOURCE_TYPE: + raise ValueError( + "Only %s are supported gcs_source_type, not `%s`. " + % ( + "`" + "`, `".join(featurestore_utils.GCS_SOURCE_TYPE) + "`", + gcs_source_type, + ) + ) + + if isinstance(gcs_source_uris, str): + gcs_source_uris = [gcs_source_uris] + gcs_source = gca_io.GcsSource(uris=gcs_source_uris) + + if gcs_source_type == "csv": + data_source = gca_io.CsvSource(gcs_source=gcs_source) + if gcs_source_type == "avro": + data_source = gca_io.AvroSource(gcs_source=gcs_source) + + import_feature_values_request = self._validate_and_get_import_feature_values_request( + feature_ids=feature_ids, + feature_time=feature_time, + data_source=data_source, + feature_source_fields=feature_source_fields, + entity_id_field=entity_id_field, + disable_online_serving=disable_online_serving, + worker_count=worker_count, + ) + + return self._import_feature_values( + import_feature_values_request=import_feature_values_request, + request_metadata=request_metadata, + ) diff --git a/google/cloud/aiplatform/featurestore/feature.py b/google/cloud/aiplatform/featurestore/feature.py index ada0816037..d41344f086 100644 --- a/google/cloud/aiplatform/featurestore/feature.py +++ b/google/cloud/aiplatform/featurestore/feature.py @@ -51,7 +51,7 @@ def _resource_id_validator(resource_id: str): resource_id(str): The resource id to validate. """ - featurestore_utils.validate_id(resource_id) + featurestore_utils.validate_feature_id(resource_id) def __init__( self, @@ -83,9 +83,12 @@ def __init__( Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id/entityTypes/my_entity_type_id/features/my_feature_id" or "my_feature_id" when project and location are initialized or passed, with featurestore_id and entity_type_id passed. featurestore_id (str): - Optional. Featurestore ID to retrieve feature from, when feature_name is passed as Feature ID. + Optional. Featurestore ID of an existing featurestore to retrieve feature from, + when feature_name is passed as Feature ID. entity_type_id (str): - Optional. EntityType ID to retrieve feature from, when feature_name is passed as Feature ID. + Optional. EntityType ID of an existing entityType to retrieve feature from, + when feature_name is passed as Feature ID. + The EntityType must exist in the Featurestore if provided by the featurestore_id. project (str): Optional. Project to retrieve feature from. If not set, project set in aiplatform.init will be used. @@ -261,11 +264,13 @@ def list( Args: entity_type_name (str): - Required. A fully-qualified entityType resource name or an entity_type ID to list features in + Required. A fully-qualified entityType resource name or an entity_type ID of an existing entityType + to list features in. The EntityType must exist in the Featurestore if provided by the featurestore_id. Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id/entityTypes/my_entity_type_id" or "my_entity_type_id" when project and location are initialized or passed, with featurestore_id passed. featurestore_id (str): - Optional. Featurestore ID to list features in, when entity_type_name is passed as entity_type ID. + Optional. Featurestore ID of an existing featurestore to list features in, + when entity_type_name is passed as entity_type ID. filter (str): Optional. Lists the Features that match the filter expression. The following filters are supported: @@ -472,3 +477,144 @@ def search( ) for gapic_resource in resource_list ] + + @classmethod + @base.optional_sync() + def create( + cls, + feature_id: str, + value_type: str, + entity_type_name: str, + featurestore_id: Optional[str] = None, + description: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "Feature": + """Creates a Feature resource in an EntityType. + + Example Usage: + + my_feature = aiplatform.Feature.create( + feature_id='my_feature_id', + value_type='INT64', + entity_type_name='projects/123/locations/us-central1/featurestores/my_featurestore_id/\ + entityTypes/my_entity_type_id' + ) + or + my_feature = aiplatform.Feature.create( + feature_id='my_feature_id', + value_type='INT64', + entity_type_name='my_entity_type_id', + featurestore_id='my_featurestore_id', + ) + + Args: + feature_id (str): + Required. The ID to use for the Feature, which will become + the final component of the Feature's resource name, which is immutable. + + This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + + The value must be unique within an EntityType. + value_type (str): + Required. Immutable. Type of Feature value. + One of BOOL, BOOL_ARRAY, DOUBLE, DOUBLE_ARRAY, INT64, INT64_ARRAY, STRING, STRING_ARRAY, BYTES. + entity_type_name (str): + Required. A fully-qualified entityType resource name or an entity_type ID of an existing entityType + to create Feature in. The EntityType must exist in the Featurestore if provided by the featurestore_id. + Example: "projects/123/locations/us-central1/featurestores/my_featurestore_id/entityTypes/my_entity_type_id" + or "my_entity_type_id" when project and location are initialized or passed, with featurestore_id passed. + featurestore_id (str): + Optional. Featurestore ID of an existing featurestore to create Feature in + if `entity_type_name` is passed an entity_type ID. + description (str): + Optional. Description of the Feature. + labels (Dict[str, str]): + Optional. The labels with user-defined + metadata to organize your Features. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + on and examples of labels. No more than 64 user + labels can be associated with one Feature + (System labels are excluded)." + System reserved label keys are prefixed with + "aiplatform.googleapis.com/" and are immutable. + project (str): + Optional. Project to create Feature in if `entity_type_name` is passed an entity_type ID. + If not set, project set in aiplatform.init will be used. + location (str): + Optional. Location to create Feature in if `entity_type_name` is passed an entity_type ID. + If not set, location set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create Features. Overrides + credentials set in aiplatform.init. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + Feature - feature resource object + + """ + entity_type_name = utils.full_resource_name( + resource_name=entity_type_name, + resource_noun=featurestore.EntityType._resource_noun, + parse_resource_name_method=featurestore.EntityType._parse_resource_name, + format_resource_name_method=featurestore.EntityType._format_resource_name, + parent_resource_name_fields={ + featurestore.Featurestore._resource_noun: featurestore_id + } + if featurestore_id + else featurestore_id, + project=project, + location=location, + resource_id_validator=featurestore.EntityType._resource_id_validator, + ) + entity_type_name_components = featurestore.EntityType._parse_resource_name( + entity_type_name + ) + + feature_config = featurestore_utils._FeatureConfig( + feature_id=feature_id, + value_type=value_type, + description=description, + labels=labels, + ) + + create_feature_request = feature_config.get_create_feature_request() + create_feature_request.parent = entity_type_name + + api_client = cls._instantiate_client( + location=entity_type_name_components["location"], credentials=credentials, + ) + + created_feature_lro = api_client.create_feature( + request=create_feature_request, metadata=request_metadata, + ) + + _LOGGER.log_create_with_lro(cls, created_feature_lro) + + created_feature = created_feature_lro.result() + + _LOGGER.log_create_complete(cls, created_feature, "feature") + + feature_obj = cls( + feature_name=created_feature.name, + project=project, + location=location, + credentials=credentials, + ) + + return feature_obj diff --git a/google/cloud/aiplatform/featurestore/featurestore.py b/google/cloud/aiplatform/featurestore/featurestore.py index 9194b1f3a9..d799e22963 100644 --- a/google/cloud/aiplatform/featurestore/featurestore.py +++ b/google/cloud/aiplatform/featurestore/featurestore.py @@ -23,6 +23,7 @@ from google.cloud.aiplatform import base from google.cloud.aiplatform.compat.types import featurestore as gca_featurestore from google.cloud.aiplatform import featurestore +from google.cloud.aiplatform import initializer from google.cloud.aiplatform import utils from google.cloud.aiplatform.utils import featurestore_utils @@ -311,7 +312,7 @@ def list_entity_types( @base.optional_sync() def delete_entity_types( - self, entity_type_ids: List[str], sync: bool = True, + self, entity_type_ids: List[str], sync: bool = True, force: bool = False, ) -> None: """Deletes entity_type resources in this Featurestore given their entity_type IDs. WARNING: This deletion is permanent. @@ -323,12 +324,236 @@ def delete_entity_types( Optional. Whether to execute this deletion synchronously. If False, this method will be executed in concurrent Future and any downstream object will be immediately returned and synced when the Future has completed. + force (bool): + Optional. If force is set to True, all features in each entityType + will be deleted prior to entityType deletion. Default is False. """ entity_types = [] for entity_type_id in entity_type_ids: entity_type = self.get_entity_type(entity_type_id=entity_type_id) - entity_type.delete(sync=False) + entity_type.delete(force=force, sync=False) entity_types.append(entity_type) for entity_type in entity_types: entity_type.wait() + + @base.optional_sync() + def delete(self, sync: bool = True, force: bool = False) -> None: + """Deletes this Featurestore resource. If force is set to True, + all entityTypes in this Featurestore will be deleted prior to featurestore deletion, + and all features in each entityType will be deleted prior to each entityType deletion. + + WARNING: This deletion is permanent. + + Args: + force (bool): + If set to true, any EntityTypes and + Features for this Featurestore will also + be deleted. (Otherwise, the request will + only work if the Featurestore has no + EntityTypes.) + sync (bool): + Whether to execute this deletion synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + """ + _LOGGER.log_action_start_against_resource("Deleting", "", self) + lro = getattr(self.api_client, self._delete_method)( + name=self.resource_name, force=force + ) + _LOGGER.log_action_started_against_resource_with_lro( + "Delete", "", self.__class__, lro + ) + lro.result() + _LOGGER.log_action_completed_against_resource("deleted.", "", self) + + @classmethod + @base.optional_sync() + def create( + cls, + featurestore_id: str, + online_store_fixed_node_count: Optional[int] = None, + labels: Optional[Dict[str, str]] = None, + project: Optional[str] = None, + location: Optional[str] = None, + credentials: Optional[auth_credentials.Credentials] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + encryption_spec_key_name: Optional[str] = None, + sync: bool = True, + ) -> "Featurestore": + """Creates a Featurestore resource. + + Example Usage: + + my_entity_type = aiplatform.EntityType.create( + entity_type_id='my_entity_type_id', + featurestore_name='projects/123/locations/us-central1/featurestores/my_featurestore_id' + ) + or + my_entity_type = aiplatform.EntityType.create( + entity_type_id='my_entity_type_id', + featurestore_name='my_featurestore_id', + ) + + Args: + featurestore_id (str): + Required. The ID to use for this Featurestore, which will + become the final component of the Featurestore's resource + name. + + This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + + The value must be unique within the project and location. + online_store_fixed_node_count (int): + Optional. Config for online serving resources. + When not specified, default node count is 1. The + number of nodes will not scale automatically but + can be scaled manually by providing different + values when updating. + labels (Dict[str, str]): + Optional. The labels with user-defined + metadata to organize your Featurestore. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + on and examples of labels. No more than 64 user + labels can be associated with one + Featurestore(System labels are excluded)." + System reserved label keys are prefixed with + "aiplatform.googleapis.com/" and are immutable. + project (str): + Optional. Project to create EntityType in. If not set, project + set in aiplatform.init will be used. + location (str): + Optional. Location to create EntityType in. If not set, location + set in aiplatform.init will be used. + credentials (auth_credentials.Credentials): + Optional. Custom credentials to use to create EntityTypes. Overrides + credentials set in aiplatform.init. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + encryption_spec (str): + Optional. Customer-managed encryption key + spec for data storage. If set, both of the + online and offline data storage will be secured + by this key. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + Featurestore - Featurestore resource object + + """ + gapic_featurestore = gca_featurestore.Featurestore( + online_serving_config=gca_featurestore.Featurestore.OnlineServingConfig( + fixed_node_count=online_store_fixed_node_count or 1 + ) + ) + + if labels: + utils.validate_labels(labels) + gapic_featurestore.labels = labels + + if encryption_spec_key_name: + gapic_featurestore.encryption_spec = initializer.global_config.get_encryption_spec( + encryption_spec_key_name=encryption_spec_key_name + ) + + api_client = cls._instantiate_client(location=location, credentials=credentials) + + created_featurestore_lro = api_client.create_featurestore( + parent=initializer.global_config.common_location_path( + project=project, location=location + ), + featurestore=gapic_featurestore, + featurestore_id=featurestore_id, + metadata=request_metadata, + ) + + _LOGGER.log_create_with_lro(cls, created_featurestore_lro) + + created_featurestore = created_featurestore_lro.result() + + _LOGGER.log_create_complete(cls, created_featurestore, "featurestore") + + featurestore_obj = cls( + featurestore_name=created_featurestore.name, + project=project, + location=location, + credentials=credentials, + ) + + return featurestore_obj + + def create_entity_type( + self, + entity_type_id: str, + description: Optional[str] = None, + labels: Optional[Dict[str, str]] = None, + request_metadata: Optional[Sequence[Tuple[str, str]]] = (), + sync: bool = True, + ) -> "featurestore.EntityType": + """Creates an EntityType resource in this Featurestore. + + Example Usage: + + my_featurestore = aiplatform.Featurestore.create( + featurestore_id='my_featurestore_id' + ) + my_entity_type = my_featurestore.create_entity_type( + entity_type_id='my_entity_type_id', + ) + + Args: + entity_type_id (str): + Required. The ID to use for the EntityType, which will + become the final component of the EntityType's resource + name. + + This value may be up to 60 characters, and valid characters + are ``[a-z0-9_]``. The first character cannot be a number. + + The value must be unique within a featurestore. + description (str): + Optional. Description of the EntityType. + labels (Dict[str, str]): + Optional. The labels with user-defined + metadata to organize your EntityTypes. + Label keys and values can be no longer than 64 + characters (Unicode codepoints), can only + contain lowercase letters, numeric characters, + underscores and dashes. International characters + are allowed. + See https://goo.gl/xmQnxf for more information + on and examples of labels. No more than 64 user + labels can be associated with one EntityType + (System labels are excluded)." + System reserved label keys are prefixed with + "aiplatform.googleapis.com/" and are immutable. + request_metadata (Sequence[Tuple[str, str]]): + Optional. Strings which should be sent along with the request as metadata. + sync (bool): + Optional. Whether to execute this creation synchronously. If False, this method + will be executed in concurrent Future and any downstream object will + be immediately returned and synced when the Future has completed. + + Returns: + featurestore.EntityType - EntityType resource object + + """ + return featurestore.EntityType.create( + entity_type_id=entity_type_id, + featurestore_name=self.resource_name, + description=description, + labels=labels, + request_metadata=request_metadata, + sync=sync, + ) diff --git a/google/cloud/aiplatform/utils/__init__.py b/google/cloud/aiplatform/utils/__init__.py index 195cae2732..acfde30aea 100644 --- a/google/cloud/aiplatform/utils/__init__.py +++ b/google/cloud/aiplatform/utils/__init__.py @@ -23,6 +23,8 @@ import re from typing import Any, Callable, Dict, Optional, Type, TypeVar, Tuple +from google.protobuf import timestamp_pb2 + from google.api_core import client_options from google.api_core import gapic_v1 from google.auth import credentials as auth_credentials @@ -612,3 +614,21 @@ def _timestamped_copy_to_gcs( gcs_path = "".join(["gs://", "/".join([blob.bucket.name, blob.name])]) return gcs_path + + +def get_timestamp_proto( + time: Optional[datetime.datetime] = datetime.datetime.now(), +) -> timestamp_pb2.Timestamp: + """Gets timestamp proto of a given time. + Args: + time (datetime.datetime): + Required. A user provided time. Default to datetime.datetime.now() if not given. + Returns: + timestamp_pb2.Timestamp - timestamp proto of the given time, not have higher than millisecond precision. + """ + t = time.timestamp() + seconds = int(t) + # must not have higher than millisecond precision. + nanos = int((t % 1 * 1e6) * 1e3) + + return timestamp_pb2.Timestamp(seconds=seconds, nanos=nanos) diff --git a/google/cloud/aiplatform/utils/featurestore_utils.py b/google/cloud/aiplatform/utils/featurestore_utils.py index fedcc37568..e9d26b62be 100644 --- a/google/cloud/aiplatform/utils/featurestore_utils.py +++ b/google/cloud/aiplatform/utils/featurestore_utils.py @@ -16,12 +16,147 @@ # import re +from typing import Dict, NamedTuple, Optional +from google.cloud.aiplatform.compat.services import featurestore_service_client +from google.cloud.aiplatform.compat.types import ( + feature as gca_feature, + featurestore_service as gca_featurestore_service, +) +from google.cloud.aiplatform import utils + +CompatFeaturestoreServiceClient = featurestore_service_client.FeaturestoreServiceClient RESOURCE_ID_PATTERN_REGEX = r"[a-z_][a-z0-9_]{0,59}" +GCS_SOURCE_TYPE = {"csv", "avro"} + +_FEATURE_VALUE_TYPE_UNSPECIFIED = "VALUE_TYPE_UNSPECIFIED" + +def validate_id(resource_id: str) -> None: + """Validates feature store resource ID pattern. -def validate_id(resource_id: str): - """Validates feature store resource ID pattern.""" + Args: + resource_id (str): + Required. Feature Store resource ID. + + Raises: + ValueError if resource_id is invalid. + """ if not re.compile(r"^" + RESOURCE_ID_PATTERN_REGEX + r"$").match(resource_id): raise ValueError("Resource ID {resource_id} is not a valied resource id.") + + +def validate_feature_id(feature_id: str) -> None: + """Validates feature ID. + + Args: + feature_id (str): + Required. Feature resource ID. + + Raises: + ValueError if feature_id is invalid. + """ + match = re.compile(r"^" + RESOURCE_ID_PATTERN_REGEX + r"$").match(feature_id) + + if not match: + raise ValueError( + f"The value of feature_id may be up to 60 characters, and valid characters are `[a-z0-9_]`. " + f"The first character cannot be a number. Instead, get {feature_id}." + ) + + reserved_words = ["entity_id", "feature_timestamp", "arrival_timestamp"] + if feature_id.lower() in reserved_words: + raise ValueError( + "The feature_id can not be any of the reserved_words: `%s`" + % ("`, `".join(reserved_words)) + ) + + +def validate_value_type(value_type: str) -> None: + """Validates user provided feature value_type string. + + Args: + value_type (str): + Required. Immutable. Type of Feature value. + One of BOOL, BOOL_ARRAY, DOUBLE, DOUBLE_ARRAY, INT64, INT64_ARRAY, STRING, STRING_ARRAY, BYTES. + + Raises: + ValueError if value_type is invalid or unspecified. + """ + if getattr(gca_feature.Feature.ValueType, value_type, None) in ( + gca_feature.Feature.ValueType.VALUE_TYPE_UNSPECIFIED, + None, + ): + raise ValueError( + f"Given value_type `{value_type}` invalid or unspecified. " + f"Choose one of {gca_feature.Feature.ValueType._member_names_} except `{_FEATURE_VALUE_TYPE_UNSPECIFIED}`" + ) + + +class _FeatureConfig(NamedTuple): + """Configuration for feature creation. + + Usage: + + config = _FeatureConfig( + feature_id='my_feature_id', + value_type='int64', + description='my description', + labels={'my_key': 'my_value'}, + ) + """ + + feature_id: str + value_type: str = _FEATURE_VALUE_TYPE_UNSPECIFIED + description: Optional[str] = None + labels: Optional[Dict[str, str]] = None + + def _get_feature_id(self) -> str: + """Validates and returns the feature_id. + + Returns: + str - valid feature ID. + + Raise: + ValueError if feature_id is invalid + """ + + # Raises ValueError if invalid feature_id + validate_feature_id(feature_id=self.feature_id) + + return self.feature_id + + def _get_value_type_enum(self) -> int: + """Validates value_type and returns the enum of the value type. + + Returns: + int - valid value type enum. + """ + + # Raises ValueError if invalid value_type + validate_value_type(value_type=self.value_type) + + value_type_enum = getattr(gca_feature.Feature.ValueType, self.value_type) + + return value_type_enum + + def get_create_feature_request( + self, + ) -> gca_featurestore_service.CreateFeatureRequest: + """Return create feature request.""" + + gapic_feature = gca_feature.Feature(value_type=self._get_value_type_enum(),) + + if self.labels: + utils.validate_labels(self.labels) + gapic_feature.labels = self.labels + + if self.description: + gapic_feature.description = self.description + + create_feature_request = gca_featurestore_service.CreateFeatureRequest( + feature=gapic_feature, feature_id=self._get_feature_id() + ) + + return create_feature_request diff --git a/tests/system/aiplatform/e2e_base.py b/tests/system/aiplatform/e2e_base.py index c63c715d7c..61b9e7f36c 100644 --- a/tests/system/aiplatform/e2e_base.py +++ b/tests/system/aiplatform/e2e_base.py @@ -58,12 +58,12 @@ def setup_method(self): importlib.reload(initializer) importlib.reload(aiplatform) - @pytest.fixture() + @pytest.fixture(scope="class") def shared_state(self) -> Generator[Dict[str, Any], None, None]: shared_state = {} yield shared_state - @pytest.fixture() + @pytest.fixture(scope="class") def prepare_staging_bucket( self, shared_state: Dict[str, Any] ) -> Generator[storage.bucket.Bucket, None, None]: @@ -80,7 +80,7 @@ def prepare_staging_bucket( ) yield - @pytest.fixture() + @pytest.fixture(scope="class") def delete_staging_bucket(self, shared_state: Dict[str, Any]): """Delete the staging bucket and all it's contents""" @@ -90,7 +90,7 @@ def delete_staging_bucket(self, shared_state: Dict[str, Any]): bucket = shared_state["bucket"] bucket.delete(force=True) - @pytest.fixture(autouse=True) + @pytest.fixture(scope="class", autouse=True) def teardown(self, shared_state: Dict[str, Any]): """Delete every Vertex AI resource created during test""" @@ -104,8 +104,10 @@ def teardown(self, shared_state: Dict[str, Any]): for resource in shared_state["resources"]: try: - if isinstance(resource, aiplatform.Endpoint): - resource.delete(force=True) # Undeploy model then delete endpoint + if isinstance(resource, (aiplatform.Endpoint, aiplatform.Featurestore)): + # For endpoint, undeploy model then delete endpoint + # For featurestore, force delete its entity_types and features with the featurestore + resource.delete(force=True) else: resource.delete() except exceptions.GoogleAPIError as e: diff --git a/tests/system/aiplatform/test_featurestore.py b/tests/system/aiplatform/test_featurestore.py index 6107f826ec..65850f7d67 100644 --- a/tests/system/aiplatform/test_featurestore.py +++ b/tests/system/aiplatform/test_featurestore.py @@ -15,24 +15,235 @@ # limitations under the License. # +import logging + from google.cloud import aiplatform from tests.system.aiplatform import e2e_base +_TEST_USERS_ENTITY_TYPE_GCS_SRC = ( + "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/users.avro" +) +_TEST_MOVIES_ENTITY_TYPE_GCS_SRC = ( + "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro" +) -class TestFeaturestore(e2e_base.TestEndToEnd): +_TEST_FEATURESTORE_ID = "movie_prediction" +_TEST_USER_ENTITY_TYPE_ID = "users" +_TEST_MOVIE_ENTITY_TYPE_ID = "movies" + +_TEST_USER_AGE_FEATURE_ID = "age" +_TEST_USER_GENDER_FEATURE_ID = "gender" +_TEST_USER_LIKED_GENRES_FEATURE_ID = "liked_genres" - _temp_prefix = "temp-vertex-sdk-e2e-feature-store-test" +_TEST_MOVIE_TITLE_FEATURE_ID = "title" +_TEST_MOVIE_GENRES_FEATURE_ID = "genres" +_TEST_MOVIE_AVERAGE_RATING_FEATURE_ID = "average_rating" + + +class TestFeaturestore(e2e_base.TestEndToEnd): - def test_create_and_get_featurestore(self, shared_state): + _temp_prefix = "temp_vertex_sdk_e2e_featurestore_test" + def test_create_get_list_featurestore(self, shared_state): aiplatform.init( project=e2e_base._PROJECT, location=e2e_base._LOCATION, ) - shared_state["resources"] = [] + base_list_featurestores = len(aiplatform.Featurestore.list()) + shared_state["base_list_searched_features"] = len(aiplatform.Feature.search()) + + featurestore_id = self._make_display_name(key=_TEST_FEATURESTORE_ID).replace( + "-", "_" + )[:60] + featurestore = aiplatform.Featurestore.create(featurestore_id=featurestore_id) + + shared_state["resources"] = [featurestore] + shared_state["featurestore"] = featurestore + shared_state["featurestore_name"] = featurestore.resource_name + + get_featurestore = aiplatform.Featurestore( + featurestore_name=featurestore.resource_name + ) + assert featurestore.resource_name == get_featurestore.resource_name list_featurestores = aiplatform.Featurestore.list() - assert len(list_featurestores) >= 0 + assert (len(list_featurestores) - base_list_featurestores) == 1 + + def test_create_get_list_entity_types(self, shared_state): + + assert shared_state["featurestore"] + assert shared_state["featurestore_name"] + + featurestore = shared_state["featurestore"] + featurestore_name = shared_state["featurestore_name"] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + # Users + user_entity_type = featurestore.create_entity_type( + entity_type_id=_TEST_USER_ENTITY_TYPE_ID + ) + shared_state["user_entity_type"] = user_entity_type + shared_state["user_entity_type_name"] = user_entity_type.resource_name + + get_user_entity_type = featurestore.get_entity_type( + entity_type_id=_TEST_USER_ENTITY_TYPE_ID + ) + assert user_entity_type.resource_name == get_user_entity_type.resource_name + + # Movies + movie_entity_type = aiplatform.EntityType.create( + entity_type_id=_TEST_MOVIE_ENTITY_TYPE_ID, + featurestore_name=featurestore_name, + ) + shared_state["movie_entity_type"] = movie_entity_type + shared_state["movie_entity_type_name"] = movie_entity_type.resource_name + + get_movie_entity_type = aiplatform.EntityType( + entity_type_name=movie_entity_type.resource_name + ) + assert movie_entity_type.resource_name == get_movie_entity_type.resource_name + + list_entity_types = aiplatform.EntityType.list( + featurestore_name=featurestore_name + ) + assert len(list_entity_types) == 2 + + def test_create_get_list_features(self, shared_state): + + assert shared_state["user_entity_type"] + assert shared_state["user_entity_type_name"] + user_entity_type = shared_state["user_entity_type"] + user_entity_type_name = shared_state["user_entity_type_name"] + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + list_user_features = user_entity_type.list_features() + assert len(list_user_features) == 0 + + # User Features + user_age_feature = user_entity_type.create_feature( + feature_id=_TEST_USER_AGE_FEATURE_ID, value_type="INT64" + ) + + get_user_age_feature = user_entity_type.get_feature( + feature_id=_TEST_USER_AGE_FEATURE_ID + ) + assert user_age_feature.resource_name == get_user_age_feature.resource_name + + user_gender_feature = aiplatform.Feature.create( + feature_id=_TEST_USER_GENDER_FEATURE_ID, + value_type="STRING", + entity_type_name=user_entity_type_name, + ) + + get_user_gender_feature = aiplatform.Feature( + feature_name=user_gender_feature.resource_name + ) + assert ( + user_gender_feature.resource_name == get_user_gender_feature.resource_name + ) + + user_liked_genres_feature = user_entity_type.create_feature( + feature_id=_TEST_USER_LIKED_GENRES_FEATURE_ID, value_type="STRING_ARRAY", + ) + + get_user_liked_genres_feature = aiplatform.Feature( + feature_name=user_liked_genres_feature.resource_name + ) + assert ( + user_liked_genres_feature.resource_name + == get_user_liked_genres_feature.resource_name + ) + + list_user_features = user_entity_type.list_features() + assert len(list_user_features) == 3 + + def test_ingest_feature_values(self, shared_state, caplog): + + assert shared_state["user_entity_type"] + user_entity_type = shared_state["user_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + user_entity_type.ingest_from_gcs( + feature_ids=[ + _TEST_USER_AGE_FEATURE_ID, + _TEST_USER_GENDER_FEATURE_ID, + _TEST_USER_LIKED_GENRES_FEATURE_ID, + ], + feature_time="update_time", + gcs_source_uris=_TEST_USERS_ENTITY_TYPE_GCS_SRC, + gcs_source_type="avro", + entity_id_field="user_id", + worker_count=2, + ) + + assert "EntityType feature values imported." in caplog.text + + caplog.clear() + + def test_batch_create_features_and_ingest_feature_values( + self, shared_state, caplog + ): + + assert shared_state["movie_entity_type"] + movie_entity_type = shared_state["movie_entity_type"] + + caplog.set_level(logging.INFO) + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) + + movie_feature_configs = { + _TEST_MOVIE_TITLE_FEATURE_ID: {"value_type": "STRING"}, + _TEST_MOVIE_GENRES_FEATURE_ID: {"value_type": "STRING"}, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID: {"value_type": "DOUBLE"}, + } + + list_movie_features = movie_entity_type.list_features() + assert len(list_movie_features) == 0 + + movie_entity_type.batch_create_features(feature_configs=movie_feature_configs) + + movie_entity_type.ingest_from_gcs( + feature_ids=[ + _TEST_MOVIE_TITLE_FEATURE_ID, + _TEST_MOVIE_GENRES_FEATURE_ID, + _TEST_MOVIE_AVERAGE_RATING_FEATURE_ID, + ], + feature_time="update_time", + gcs_source_uris=_TEST_MOVIES_ENTITY_TYPE_GCS_SRC, + gcs_source_type="avro", + entity_id_field="movie_id", + worker_count=2, + ) + + list_movie_features = movie_entity_type.list_features() + assert len(list_movie_features) == 3 + + assert "EntityType feature values imported." in caplog.text + + caplog.clear() + + def test_search_features(self, shared_state): + + assert shared_state["base_list_searched_features"] is not None + + aiplatform.init( + project=e2e_base._PROJECT, location=e2e_base._LOCATION, + ) list_searched_features = aiplatform.Feature.search() - assert len(list_searched_features) >= 0 + assert ( + len(list_searched_features) - shared_state["base_list_searched_features"] + ) == 6 diff --git a/tests/unit/aiplatform/test_featurestores.py b/tests/unit/aiplatform/test_featurestores.py index 5a93c8efd5..f76e6ecf22 100644 --- a/tests/unit/aiplatform/test_featurestores.py +++ b/tests/unit/aiplatform/test_featurestores.py @@ -16,6 +16,7 @@ # import pytest +import datetime from unittest import mock from importlib import reload @@ -27,15 +28,19 @@ from google.cloud import aiplatform from google.cloud.aiplatform import base from google.cloud.aiplatform import initializer +from google.cloud.aiplatform import utils + from google.cloud.aiplatform.utils import featurestore_utils from google.cloud.aiplatform_v1.services.featurestore_service import ( client as featurestore_service_client, ) from google.cloud.aiplatform_v1.types import ( - featurestore as gca_featurestore, + encryption_spec as gca_encryption_spec, entity_type as gca_entity_type, feature as gca_feature, - encryption_spec as gca_encryption_spec, + featurestore as gca_featurestore, + featurestore_service as gca_featurestore_service, + io as gca_io, ) # project @@ -63,6 +68,9 @@ _TEST_FEATURE_ID = "feature_id" _TEST_FEATURE_NAME = f"{_TEST_ENTITY_TYPE_NAME}/features/{_TEST_FEATURE_ID}" _TEST_FEATURE_INVALID = f"{_TEST_ENTITY_TYPE_NAME}/feature/{_TEST_FEATURE_ID}" +_TEST_FEATURE_VALUE_TYPE = "INT64" +_TEST_FEATURE_VALUE_TYPE_ENUM = 9 +_TEST_FEATURE_ID_INVALID = "1feature_id" # misc _TEST_DESCRIPTION = "my description" @@ -115,6 +123,38 @@ gca_feature.Feature(name=_TEST_FEATURE_NAME,), ] +_TEST_FEATURE_CONFIGS = { + "my_feature_id_1": {"value_type": _TEST_FEATURE_VALUE_TYPE}, +} + +_TEST_IMPORTING_FEATURE_IDS = ["my_feature_id_1"] + +_TEST_IMPORTING_FEATURE_SOURCE_FIELDS = { + "my_feature_id_1": "my_feature_id_1_source_field", +} + +_TEST_FEATURE_TIME_FIELD = "feature_time_field" +_TEST_FEATURE_TIME = datetime.datetime.now() + +_TEST_BQ_SOURCE_URI = "bq://project.dataset.table_name" +_TEST_GCS_AVRO_SOURCE_URIS = [ + "gs://my_bucket/my_file_1.avro", +] +_TEST_GCS_CSV_SOURCE_URIS = [ + "gs://my_bucket/my_file_1.csv", +] +_TEST_GCS_SOURCE_TYPE_CSV = "csv" +_TEST_GCS_SOURCE_TYPE_AVRO = "avro" +_TEST_GCS_SOURCE_TYPE_INVALID = "json" + +_TEST_BQ_SOURCE = gca_io.BigQuerySource(input_uri=_TEST_BQ_SOURCE_URI) +_TEST_AVRO_SOURCE = gca_io.AvroSource( + gcs_source=gca_io.GcsSource(uris=_TEST_GCS_AVRO_SOURCE_URIS) +) +_TEST_CSV_SOURCE = gca_io.CsvSource( + gcs_source=gca_io.GcsSource(uris=_TEST_GCS_CSV_SOURCE_URIS) +) + # All Featurestore Mocks @pytest.fixture @@ -170,6 +210,23 @@ def search_features_mock(): yield search_features_mock +@pytest.fixture +def create_featurestore_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, "create_featurestore" + ) as create_featurestore_mock: + create_featurestore_lro_mock = mock.Mock(operation.Operation) + create_featurestore_lro_mock.result.return_value = gca_featurestore.Featurestore( + name=_TEST_FEATURESTORE_NAME, + online_serving_config=gca_featurestore.Featurestore.OnlineServingConfig( + fixed_node_count=_TEST_ONLINE_SERVING_CONFIG + ), + encryption_spec=_TEST_ENCRYPTION_SPEC, + ) + create_featurestore_mock.return_value = create_featurestore_lro_mock + yield create_featurestore_mock + + # ALL EntityType Mocks @pytest.fixture def get_entity_type_mock(): @@ -211,6 +268,29 @@ def delete_entity_type_mock(): yield delete_entity_type_mock +@pytest.fixture +def create_entity_type_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, "create_entity_type" + ) as create_entity_type_mock: + create_entity_type_lro_mock = mock.Mock(operation.Operation) + create_entity_type_lro_mock.result.return_value = gca_entity_type.EntityType( + name=_TEST_ENTITY_TYPE_NAME + ) + create_entity_type_mock.return_value = create_entity_type_lro_mock + yield create_entity_type_mock + + +@pytest.fixture +def import_feature_values_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, "import_feature_values" + ) as import_feature_values_mock: + import_feature_values_lro_mock = mock.Mock(operation.Operation) + import_feature_values_mock.return_value = import_feature_values_lro_mock + yield import_feature_values_mock + + # ALL Feature Mocks @pytest.fixture def get_feature_mock(): @@ -250,6 +330,29 @@ def delete_feature_mock(): yield delete_feature_mock +@pytest.fixture +def create_feature_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, "create_feature" + ) as create_feature_mock: + create_feature_lro_mock = mock.Mock(operation.Operation) + create_feature_lro_mock.result.return_value = gca_feature.Feature( + name=_TEST_FEATURE_NAME, value_type=_TEST_FEATURE_VALUE_TYPE_ENUM, + ) + create_feature_mock.return_value = create_feature_lro_mock + yield create_feature_mock + + +@pytest.fixture +def batch_create_features_mock(): + with patch.object( + featurestore_service_client.FeaturestoreServiceClient, "batch_create_features" + ) as batch_create_features_mock: + batch_create_features_lro_mock = mock.Mock(operation.Operation) + batch_create_features_mock.return_value = batch_create_features_lro_mock + yield batch_create_features_mock + + class TestFeaturestoreUtils: @pytest.mark.parametrize( "resource_id", ["resource_id", "resource_id12345", "_resource_id", "_123456"], @@ -272,6 +375,113 @@ def test_validate_invalid_resource_id(self, resource_id: str): with pytest.raises(ValueError): featurestore_utils.validate_id(resource_id) + @pytest.mark.parametrize( + "feature_id", ["resource_id", "resource_id12345", "_resource_id", "_123456"], + ) + def test_validate_feature_id(self, feature_id: str): + assert featurestore_utils.validate_feature_id(feature_id=feature_id) is None + + @pytest.mark.parametrize( + "feature_id", + [ + "12345resource_id", + "resource_id/1234", + "_resource_id/1234", + "resource-id-1234", + "123456", + "c" * 61, + "entity_id", + "Entity_ID", + "feature_timestamp", + "Feature_Timestamp", + "arrival_timestamp", + "Arrival_Timestamp", + ], + ) + def test_validate_feature_id_with_raise(self, feature_id: str): + with pytest.raises(ValueError): + featurestore_utils.validate_feature_id(feature_id=feature_id) + + @pytest.mark.parametrize( + "value_type", + [ + "BOOL", + "BOOL_ARRAY", + "DOUBLE", + "DOUBLE_ARRAY", + "INT64", + "INT64_ARRAY", + "STRING", + "STRING_ARRAY", + "BYTES", + ], + ) + def test_validate_value_type(self, value_type: str): + assert featurestore_utils.validate_value_type(value_type=value_type) is None + + @pytest.mark.parametrize( + "value_type", + [ + "INT", + "INT_array", + "STR", + "double", + "bool", + "array", + "INT32", + "VALUE_TYPE_UNSPECIFIED", + ], + ) + def test_validate_value_type_with_raise(self, value_type: str): + with pytest.raises(ValueError): + featurestore_utils.validate_value_type(value_type=value_type) + + +class Test_FeatureConfig: + def test_feature_config_return_create_feature_request(self): + + featureConfig = featurestore_utils._FeatureConfig( + feature_id=_TEST_FEATURE_ID, + value_type=_TEST_FEATURE_VALUE_TYPE, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + + gapic_feature = gca_feature.Feature( + description=_TEST_DESCRIPTION, + value_type=_TEST_FEATURE_VALUE_TYPE_ENUM, + labels=_TEST_LABELS, + ) + + expected_request = gca_featurestore_service.CreateFeatureRequest( + feature=gapic_feature, feature_id=_TEST_FEATURE_ID, + ) + + assert featureConfig.get_create_feature_request() == expected_request + + def test_feature_config_create_feature_request_raises_invalid_feature_id(self): + featureConfig = featurestore_utils._FeatureConfig( + feature_id=_TEST_FEATURE_ID_INVALID, + value_type=_TEST_FEATURE_VALUE_TYPE, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + with pytest.raises(ValueError): + featureConfig.get_create_feature_request() + + @pytest.mark.parametrize("value_type", ["INT", "VALUE_TYPE_UNSPECIFIED"]) + def test_feature_config_create_feature_request_raises_invalid_value_type( + self, value_type + ): + featureConfig = featurestore_utils._FeatureConfig( + feature_id=_TEST_FEATURE_ID, + value_type=value_type, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + with pytest.raises(ValueError): + featureConfig.get_create_feature_request() + class TestFeaturestore: def setup_method(self): @@ -366,21 +576,31 @@ def test_list_featurestores(self, list_featurestores_mock): for my_featurestore in my_featurestore_list: assert type(my_featurestore) == aiplatform.Featurestore - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "force, sync", + [ + (None, True), + (True, True), + (False, True), + (None, False), + (True, False), + (False, False), + ], + ) @pytest.mark.usefixtures("get_featurestore_mock") - def test_delete_featurestore(self, delete_featurestore_mock, sync): + def test_delete_featurestore(self, delete_featurestore_mock, force, sync): aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_ID ) - my_featurestore.delete(sync=sync) + my_featurestore.delete(sync=sync, force=force) if not sync: my_featurestore.wait() delete_featurestore_mock.assert_called_once_with( - name=my_featurestore.resource_name + name=my_featurestore.resource_name, force=force, ) @pytest.mark.usefixtures("get_featurestore_mock") @@ -399,16 +619,28 @@ def test_list_entity_types(self, list_entity_types_mock): for my_entity_type in my_entity_type_list: assert type(my_entity_type) == aiplatform.EntityType - @pytest.mark.parametrize("sync", [True, False]) + @pytest.mark.parametrize( + "force, sync", + [ + (None, True), + (True, True), + (False, True), + (None, False), + (True, False), + (False, False), + ], + ) @pytest.mark.usefixtures("get_featurestore_mock", "get_entity_type_mock") - def test_delete_entity_types(self, delete_entity_type_mock, sync): + def test_delete_entity_types(self, delete_entity_type_mock, force, sync): aiplatform.init(project=_TEST_PROJECT) my_featurestore = aiplatform.Featurestore( featurestore_name=_TEST_FEATURESTORE_ID ) my_featurestore.delete_entity_types( - entity_type_ids=[_TEST_ENTITY_TYPE_ID, _TEST_ENTITY_TYPE_ID], sync=sync + entity_type_ids=[_TEST_ENTITY_TYPE_ID, _TEST_ENTITY_TYPE_ID], + sync=sync, + force=force, ) if not sync: @@ -416,12 +648,69 @@ def test_delete_entity_types(self, delete_entity_type_mock, sync): delete_entity_type_mock.assert_has_calls( calls=[ - mock.call(name=_TEST_ENTITY_TYPE_NAME), - mock.call(name=_TEST_ENTITY_TYPE_NAME), + mock.call(name=_TEST_ENTITY_TYPE_NAME, force=force), + mock.call(name=_TEST_ENTITY_TYPE_NAME, force=force), ], any_order=True, ) + @pytest.mark.usefixtures("get_featurestore_mock", "get_entity_type_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_entity_type(self, create_entity_type_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore( + featurestore_name=_TEST_FEATURESTORE_NAME + ) + my_entity_type = my_featurestore.create_entity_type( + entity_type_id=_TEST_ENTITY_TYPE_ID, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + sync=sync, + ) + + if not sync: + my_entity_type.wait() + + expected_entity_type = gca_entity_type.EntityType( + labels=_TEST_LABELS, description=_TEST_DESCRIPTION, + ) + create_entity_type_mock.assert_called_once_with( + parent=_TEST_FEATURESTORE_NAME, + entity_type=expected_entity_type, + entity_type_id=_TEST_ENTITY_TYPE_ID, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_featurestore_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_featurestore(self, create_featurestore_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_featurestore = aiplatform.Featurestore.create( + featurestore_id=_TEST_FEATURESTORE_ID, + online_store_fixed_node_count=_TEST_ONLINE_SERVING_CONFIG, + labels=_TEST_LABELS, + encryption_spec_key_name=_TEST_ENCRYPTION_KEY_NAME, + ) + + if not sync: + my_featurestore.wait() + + expected_featurestore = gca_featurestore.Featurestore( + labels=_TEST_LABELS, + online_serving_config=gca_featurestore.Featurestore.OnlineServingConfig( + fixed_node_count=_TEST_ONLINE_SERVING_CONFIG + ), + encryption_spec=_TEST_ENCRYPTION_SPEC, + ) + create_featurestore_mock.assert_called_once_with( + parent=_TEST_PARENT, + featurestore=expected_featurestore, + featurestore_id=_TEST_FEATURESTORE_ID, + metadata=_TEST_REQUEST_METADATA, + ) + class TestEntityType: def setup_method(self): @@ -543,6 +832,228 @@ def test_delete_features(self, delete_feature_mock, sync): any_order=True, ) + @pytest.mark.usefixtures("get_entity_type_mock", "get_feature_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_feature(self, create_feature_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + my_feature = my_entity_type.create_feature( + feature_id=_TEST_FEATURE_ID, + value_type=_TEST_FEATURE_VALUE_TYPE, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + + if not sync: + my_feature.wait() + + expected_feature = gca_feature.Feature( + value_type=_TEST_FEATURE_VALUE_TYPE_ENUM, + labels=_TEST_LABELS, + description=_TEST_DESCRIPTION, + ) + expected_request = gca_featurestore_service.CreateFeatureRequest( + parent=_TEST_ENTITY_TYPE_NAME, + feature=expected_feature, + feature_id=_TEST_FEATURE_ID, + ) + + create_feature_mock.assert_called_once_with( + request=expected_request, metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_entity_type(self, create_entity_type_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType.create( + entity_type_id=_TEST_ENTITY_TYPE_ID, + featurestore_name=_TEST_FEATURESTORE_NAME, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + + if not sync: + my_entity_type.wait() + + expected_entity_type = gca_entity_type.EntityType( + description=_TEST_DESCRIPTION, labels=_TEST_LABELS, + ) + create_entity_type_mock.assert_called_once_with( + parent=_TEST_FEATURESTORE_NAME, + entity_type=expected_entity_type, + entity_type_id=_TEST_ENTITY_TYPE_ID, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + def test_validate_and_get_create_feature_requests(self): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + create_feature_requests = my_entity_type._validate_and_get_create_feature_requests( + feature_configs=_TEST_FEATURE_CONFIGS + ) + + expected_requests = [ + gca_featurestore_service.CreateFeatureRequest( + feature=gca_feature.Feature(value_type=_TEST_FEATURE_VALUE_TYPE_ENUM), + feature_id="my_feature_id_1", + ), + ] + assert create_feature_requests == expected_requests + + @pytest.mark.usefixtures("get_entity_type_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_batch_create_features(self, batch_create_features_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + my_entity_type.batch_create_features(feature_configs=_TEST_FEATURE_CONFIGS) + + if not sync: + my_entity_type.wait() + + expected_requests = [ + gca_featurestore_service.CreateFeatureRequest( + feature=gca_feature.Feature(value_type=_TEST_FEATURE_VALUE_TYPE_ENUM), + feature_id="my_feature_id_1", + ), + ] + + batch_create_features_mock.assert_called_once_with( + parent=my_entity_type.resource_name, + requests=expected_requests, + metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + def test_validate_and_get_import_feature_values_request_with_source_fields(self): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + bigquery_source=_TEST_BQ_SOURCE, + feature_time_field=_TEST_FEATURE_TIME_FIELD, + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + ) + assert ( + true_import_feature_values_request + == my_entity_type._validate_and_get_import_feature_values_request( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_FIELD, + data_source=_TEST_BQ_SOURCE, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + ) + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + def test_validate_and_get_import_feature_values_request_without_source_fields(self): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1" + ), + ], + csv_source=_TEST_CSV_SOURCE, + feature_time=utils.get_timestamp_proto(_TEST_FEATURE_TIME), + ) + assert ( + true_import_feature_values_request + == my_entity_type._validate_and_get_import_feature_values_request( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME, + data_source=_TEST_CSV_SOURCE, + ) + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_ingest_from_bq(self, import_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + my_entity_type.ingest_from_bq( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_FIELD, + bq_source_uri=_TEST_BQ_SOURCE_URI, + feature_source_fields=_TEST_IMPORTING_FEATURE_SOURCE_FIELDS, + sync=sync, + ) + + if not sync: + my_entity_type.wait() + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1", source_field="my_feature_id_1_source_field" + ), + ], + bigquery_source=_TEST_BQ_SOURCE, + feature_time_field=_TEST_FEATURE_TIME_FIELD, + ) + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_ingest_from_gcs(self, import_feature_values_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + my_entity_type.ingest_from_gcs( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME, + gcs_source_uris=_TEST_GCS_AVRO_SOURCE_URIS, + gcs_source_type=_TEST_GCS_SOURCE_TYPE_AVRO, + sync=sync, + ) + + if not sync: + my_entity_type.wait() + + true_import_feature_values_request = gca_featurestore_service.ImportFeatureValuesRequest( + entity_type=_TEST_ENTITY_TYPE_NAME, + feature_specs=[ + gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec( + id="my_feature_id_1" + ), + ], + avro_source=_TEST_AVRO_SOURCE, + feature_time=utils.get_timestamp_proto(_TEST_FEATURE_TIME), + ) + import_feature_values_mock.assert_called_once_with( + request=true_import_feature_values_request, metadata=_TEST_REQUEST_METADATA, + ) + + @pytest.mark.usefixtures("get_entity_type_mock") + def test_ingest_from_gcs_with_invalid_gcs_source_type(self): + aiplatform.init(project=_TEST_PROJECT) + + my_entity_type = aiplatform.EntityType(entity_type_name=_TEST_ENTITY_TYPE_NAME) + with pytest.raises(ValueError): + my_entity_type.ingest_from_gcs( + feature_ids=_TEST_IMPORTING_FEATURE_IDS, + feature_time=_TEST_FEATURE_TIME_FIELD, + gcs_source_uris=_TEST_GCS_CSV_SOURCE_URIS, + gcs_source_type=_TEST_GCS_SOURCE_TYPE_INVALID, + ) + class TestFeature: def setup_method(self): @@ -661,3 +1172,34 @@ def test_search_features(self, search_features_mock): assert len(my_feature_list) == len(_TEST_FEATURE_LIST) for my_feature in my_feature_list: assert type(my_feature) == aiplatform.Feature + + @pytest.mark.usefixtures("get_feature_mock") + @pytest.mark.parametrize("sync", [True, False]) + def test_create_feature(self, create_feature_mock, sync): + aiplatform.init(project=_TEST_PROJECT) + + my_feature = aiplatform.Feature.create( + feature_id=_TEST_FEATURE_ID, + value_type=_TEST_FEATURE_VALUE_TYPE, + entity_type_name=_TEST_ENTITY_TYPE_ID, + featurestore_id=_TEST_FEATURESTORE_ID, + description=_TEST_DESCRIPTION, + labels=_TEST_LABELS, + ) + + if not sync: + my_feature.wait() + + expected_feature = gca_feature.Feature( + value_type=_TEST_FEATURE_VALUE_TYPE_ENUM, + labels=_TEST_LABELS, + description=_TEST_DESCRIPTION, + ) + create_feature_mock.assert_called_once_with( + request=gca_featurestore_service.CreateFeatureRequest( + parent=_TEST_ENTITY_TYPE_NAME, + feature=expected_feature, + feature_id=_TEST_FEATURE_ID, + ), + metadata=_TEST_REQUEST_METADATA, + ) diff --git a/tests/unit/aiplatform/test_utils.py b/tests/unit/aiplatform/test_utils.py index bd91fced28..d4840609b1 100644 --- a/tests/unit/aiplatform/test_utils.py +++ b/tests/unit/aiplatform/test_utils.py @@ -18,6 +18,10 @@ import pytest from typing import Callable, Dict, Optional +import datetime +from decimal import Decimal + +from google.protobuf import timestamp_pb2 from google.api_core import client_options from google.api_core import gapic_v1 @@ -319,6 +323,60 @@ def test_client_w_override_select_version(): ) +@pytest.mark.parametrize( + "year,month,day,hour,minute,second,microsecond,expected_seconds,expected_nanos", + [ + ( + 2021, + 12, + 23, + 23, + 59, + 59, + 999999, + 1640303999, + int(str(Decimal(1640303999.999999)).split(".")[1][:9]), + ), + ( + 2013, + 1, + 1, + 1, + 1, + 1, + 199999, + 1357002061, + int(str(Decimal(1357002061.199999)).split(".")[1][:9]), + ), + ], +) +def test_get_timestamp_proto( + year, + month, + day, + hour, + minute, + second, + microsecond, + expected_seconds, + expected_nanos, +): + time = datetime.datetime( + year=year, + month=month, + day=day, + hour=hour, + minute=minute, + second=second, + microsecond=microsecond, + tzinfo=datetime.timezone.utc, + ) + true_timestamp_proto = timestamp_pb2.Timestamp( + seconds=expected_seconds, nanos=expected_nanos + ) + assert true_timestamp_proto == utils.get_timestamp_proto(time) + + class TestPipelineUtils: SAMPLE_JOB_SPEC = { "pipelineSpec": {