Skip to content

Commit

Permalink
fix: change gcs to str and list[str]; batch_create_request error by c…
Browse files Browse the repository at this point in the history
…hanging feature_config.request_dict to get_create_feature_request() with parent option
  • Loading branch information
morgandu committed Dec 2, 2021
1 parent 8af8486 commit 56d0db5
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 157 deletions.
63 changes: 38 additions & 25 deletions google/cloud/aiplatform/featurestore/entity_type.py
Expand Up @@ -392,27 +392,34 @@ def delete_features(self, feature_ids: List[str] = None, sync: bool = True) -> N
for feature in features:
feature.wait()

def delete(self, force: bool = False, sync: bool = True) -> None:
def delete(self, force: bool = None, sync: bool = True) -> None:
"""Deletes this EntityType resource. If force is set to True,
all features in this EntityType will be deleted prior to entityType deletion.
WARNING: This deletion is permanent.
Args:
force (bool):
Required. If force is set to True, all features in this EntityType will be
deleted prior to entityType deletion.
If set to true, any Features for this
EntityType will also be deleted.
(Otherwise, the request will only work
if the EntityType has no Features.)
sync (bool):
Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If features are created in this EntityType and force = False.
"""
if force:
self.delete_features()

super().delete(sync=sync)
_LOGGER.log_action_start_against_resource("Deleting", "", self)
lro = getattr(self.api_client, self._delete_method)(
name=self.resource_name, force=force
)
_LOGGER.log_action_started_against_resource_with_lro(
"Delete", "", self.__class__, lro
)
lro.result()
_LOGGER.log_action_completed_against_resource("deleted.", "", self)

@classmethod
@base.optional_sync()
Expand Down Expand Up @@ -629,19 +636,23 @@ def _validate_and_get_batch_create_features_requests(
List[Dict[str, Any]] - list of feature creation request
"""

batch_create_features_requests = [
featurestore_utils._FeatureConfig(
requests = []
for feature_id, feature_config in feature_configs.items():
feature_config = featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=feature_config.get(
"value_type", featurestore_utils._FEATURE_VALUE_TYPE_UNSPECIFIED
),
description=feature_config.get("description", None),
labels=feature_config.get("labels", {}),
).request_dict
for feature_id, feature_config in feature_configs.items()
]
)
create_feature_request = feature_config.get_create_feature_request()
requests.append(create_feature_request)

return batch_create_features_requests
batch_create_features_request = gca_featurestore_service.BatchCreateFeaturesRequest(
parent=self.resource_name, requests=requests
)
return batch_create_features_request

@base.optional_sync(return_input_arg="self")
def batch_create_features(
Expand Down Expand Up @@ -710,7 +721,7 @@ def batch_create_features(
Returns:
EntityType - entity_type resource object
"""
batch_create_feature_requests = self._validate_and_get_batch_create_features_requests(
batch_create_features_request = self._validate_and_get_batch_create_features_requests(
feature_configs=feature_configs
)

Expand All @@ -719,9 +730,7 @@ def batch_create_features(
)

batch_created_features_lro = self.api_client.batch_create_features(
parent=self.resource_name,
requests=batch_create_feature_requests,
metadata=request_metadata,
request=batch_create_features_request, metadata=request_metadata,
)

_LOGGER.log_action_started_against_resource_with_lro(
Expand All @@ -741,7 +750,7 @@ def batch_create_features(

def _validate_and_get_import_feature_values_request(
self,
feature_ids: Sequence[str],
feature_ids: List[str],
feature_source_fields: Optional[Dict[str, str]] = {},
avro_source: Optional[gca_io.AvroSource] = None,
bigquery_source: Optional[gca_io.BigQuerySource] = None,
Expand All @@ -754,7 +763,7 @@ def _validate_and_get_import_feature_values_request(
) -> Dict[str, Any]:
"""Validates and get import feature values request.
Args:
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -925,7 +934,7 @@ def _import_feature_values(
def ingest_from_bq(
self,
bq_source_uri: str,
feature_ids: Sequence[str],
feature_ids: List[str],
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
Expand All @@ -945,7 +954,7 @@ def ingest_from_bq(
Required. BigQuery URI to the input table.
Example:
'bq://project.dataset.table_name'
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -1075,7 +1084,7 @@ def ingest_from_gcs(
self,
gcs_source_uris: Union[str, List[str]],
gcs_source_type: str,
feature_ids: Sequence[str],
feature_ids: List[str],
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
Expand All @@ -1091,19 +1100,19 @@ def ingest_from_gcs(
"""Ingest feature values from GCS.
Args:
gcs_source_uris (Sequence[str]):
gcs_source_uris (Union[str, List[str]]):
Required. Google Cloud Storage URI(-s) to the
input file(s). May contain wildcards. For more
information on wildcards, see
https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames.
Example:
["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"]
or
["gs://my_bucket/my_file.avro"]
"gs://my_bucket/my_file.avro"
gcs_source_type (str):
Required. The type of the input file(s) provided by `gcs_source_uris`,
the value of gcs_source_type can only be either `csv`, or `avro`.
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -1220,7 +1229,11 @@ def ingest_from_gcs(
gcs_source_type,
)
)

if isinstance(gcs_source_uris, str):
gcs_source_uris = [gcs_source_uris]
gcs_source = gca_io.GcsSource(uris=gcs_source_uris)

csv_source, avro_source = None, None
if gcs_source_type == "csv":
csv_source = gca_io.CsvSource(gcs_source=gcs_source)
Expand Down
9 changes: 5 additions & 4 deletions google/cloud/aiplatform/featurestore/feature.py
Expand Up @@ -569,14 +569,15 @@ def create(
location=location,
)

create_feature_request = featurestore_utils._FeatureConfig(
feature_config = featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=value_type,
description=description,
labels=labels,
).request_dict

create_feature_request["parent"] = entity_type_name
)
create_feature_request = feature_config.get_create_feature_request(
parent=entity_type_name
)

api_client = cls._instantiate_client(location=location, credentials=credentials)

Expand Down
26 changes: 17 additions & 9 deletions google/cloud/aiplatform/featurestore/featurestore.py
Expand Up @@ -333,7 +333,8 @@ def delete_entity_types(
for entity_type in entity_types:
entity_type.wait()

def delete(self, force: bool = False, sync: bool = True) -> None:
@base.optional_sync()
def delete(self, force: bool = None, sync: bool = True) -> None:
"""Deletes this Featurestore resource. If force is set to True,
all entityTypes in this Featurestore will be deleted prior to featurestore deletion,
and all features in each entityType will be deleted prior to each entityType deletion.
Expand All @@ -342,20 +343,27 @@ def delete(self, force: bool = False, sync: bool = True) -> None:
Args:
force (bool):
Required. If force is set to True, all entityTypes in this Featurestore will be
deleted prior to featurestore deletion, and all features in each entityType will
be deleted prior to each entityType deletion.
If set to true, any EntityTypes and
Features for this Featurestore will also
be deleted. (Otherwise, the request will
only work if the Featurestore has no
EntityTypes.)
sync (bool):
Required. Whether to execute this deletion synchronously. If False, this method
Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If entityTypes are created in this Featurestore and force = False.
"""
if force:
self.delete_entity_types(force=force)

super().delete(sync=sync)
_LOGGER.log_action_start_against_resource("Deleting", "", self)
lro = getattr(self.api_client, self._delete_method)(
name=self.resource_name, force=force
)
_LOGGER.log_action_started_against_resource_with_lro(
"Delete", "", self.__class__, lro
)
lro.result()
_LOGGER.log_action_completed_against_resource("deleted.", "", self)

@classmethod
@base.optional_sync()
Expand Down
26 changes: 17 additions & 9 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Expand Up @@ -17,12 +17,15 @@

import datetime
import re
from typing import Dict, NamedTuple, Optional, Tuple, Union
from typing import Dict, NamedTuple, Optional, Tuple

from google.protobuf import timestamp_pb2

from google.cloud.aiplatform.compat.services import featurestore_service_client
from google.cloud.aiplatform.compat.types import feature as gca_feature
from google.cloud.aiplatform.compat.types import (
feature as gca_feature,
featurestore_service as gca_featurestore_service,
)
from google.cloud.aiplatform import utils

CompatFeaturestoreServiceClient = featurestore_service_client.FeaturestoreServiceClient
Expand Down Expand Up @@ -197,9 +200,10 @@ def _get_value_type_enum(self) -> int:

return value_type_enum

@property
def request_dict(self) -> Dict[str, Union[int, str, Dict[str, str]]]:
"""Return request as a Dict."""
def get_create_feature_request(
self, parent: Optional[str] = None
) -> gca_featurestore_service.CreateFeatureRequest:
"""Return create feature request."""

if self.labels:
utils.validate_labels(self.labels)
Expand All @@ -210,10 +214,14 @@ def request_dict(self) -> Dict[str, Union[int, str, Dict[str, str]]]:
labels=self.labels,
)

create_feature_request = {
"feature": gapic_feature,
"feature_id": self._get_feature_id(),
}
if parent:
create_feature_request = gca_featurestore_service.CreateFeatureRequest(
parent=parent, feature=gapic_feature, feature_id=self._get_feature_id()
)
else:
create_feature_request = gca_featurestore_service.CreateFeatureRequest(
feature=gapic_feature, feature_id=self._get_feature_id()
)

return create_feature_request

Expand Down
25 changes: 19 additions & 6 deletions tests/system/aiplatform/test_featurestore.py
Expand Up @@ -15,13 +15,15 @@
# limitations under the License.
#

import logging

from google.cloud import aiplatform
from tests.system.aiplatform import e2e_base

_USERS_ENTITY_TYPE_SRC = (
_TEST_USERS_ENTITY_TYPE_GCS_SRC = (
"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/users.avro"
)
_MOVIES_ENTITY_TYPE_SRC = (
_TEST_MOVIES_ENTITY_TYPE_GCS_SRC = (
"gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movies.avro"
)

Expand All @@ -42,7 +44,9 @@ class TestFeaturestore(e2e_base.TestEndToEnd):

_temp_prefix = "temp_vertex_sdk_e2e_featurestore_test"

def test_end_to_end(self, shared_state):
def test_end_to_end(self, shared_state, caplog):

caplog.set_level(logging.INFO)

aiplatform.init(
project=e2e_base._PROJECT, location=e2e_base._LOCATION,
Expand Down Expand Up @@ -133,14 +137,18 @@ def test_end_to_end(self, shared_state):
assert len(list_user_features) == 3

user_entity_type.ingest_from_gcs(
gcs_source_uris=_USERS_ENTITY_TYPE_SRC,
gcs_source_uris=_TEST_USERS_ENTITY_TYPE_GCS_SRC,
gcs_source_type="avro",
feature_ids=[
user_age_feature_id,
user_gender_feature_id,
user_liked_genres_feature_id,
],
entity_id_field="user_id",
feature_time_field="update_time",
worker_count=2,
)
assert "EntityType feature values imported." in caplog.text

movie_title_feature_id = _TEST_MOVIE_TITLE_FEATURE_ID
movie_genres_feature_id = _TEST_MOVIE_GENRES_FEATURE_ID
Expand All @@ -156,18 +164,23 @@ def test_end_to_end(self, shared_state):
assert len(list_movie_features) == 0

movie_entity_type.ingest_from_gcs(
gcs_source_uris=_MOVIES_ENTITY_TYPE_SRC,
gcs_source_uris=_TEST_MOVIES_ENTITY_TYPE_GCS_SRC,
gcs_source_type="avro",
feature_ids=[
movie_title_feature_id,
movie_genres_feature_id,
movie_average_rating_id,
],
feature_configs=movie_feature_configs,
entity_id_field="movie_id",
feature_time_field="update_time",
worker_count=2,
batch_create_feature_configs=movie_feature_configs,
)

list_movie_features = movie_entity_type.list_features()
assert len(list_movie_features) == 3

list_searched_features = aiplatform.Feature.search()
assert len(list_searched_features) - base_list_searched_features == 6

caplog.clear()

0 comments on commit 56d0db5

Please sign in to comment.