Skip to content

Commit

Permalink
fix: change gcs to str and list[str]; batch_create_request error by c…
Browse files Browse the repository at this point in the history
…hanging feature_config.request_dict to get_create_feature_request() with parent option
  • Loading branch information
morgandu committed Dec 4, 2021
1 parent 5fae3a3 commit 9b1b72a
Show file tree
Hide file tree
Showing 6 changed files with 263 additions and 260 deletions.
90 changes: 49 additions & 41 deletions google/cloud/aiplatform/featurestore/entity_type.py
Expand Up @@ -366,28 +366,23 @@ def list_features(
)

@base.optional_sync()
def delete_features(self, feature_ids: List[str] = None, sync: bool = True) -> None:
"""Deletes feature resources in this EntityType.
def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
"""Deletes feature resources in this EntityType given their feature IDs.
WARNING: This deletion is permanent.
Args:
feature_ids (List[str]):
Optional. The list of feature IDs to be deleted. If feature_ids is not set,
all features in this EntityType will be deleted.
Required. The list of feature IDs to be deleted.
sync (bool):
Optional. Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
"""
if not feature_ids:
features = self.list_features()
elif feature_ids and isinstance(feature_ids, list):
features = [
self.get_feature(feature_id=feature_id) for feature_id in feature_ids
]

for feature in features:
features = []
for feature_id in feature_ids:
feature = self.get_feature(feature_id=feature_id)
feature.delete(sync=False)
features.append(feature)

for feature in features:
feature.wait()
Expand All @@ -400,19 +395,26 @@ def delete(self, force: bool = False, sync: bool = True) -> None:
Args:
force (bool):
Required. If force is set to True, all features in this EntityType will be
deleted prior to entityType deletion.
If set to true, any Features for this
EntityType will also be deleted.
(Otherwise, the request will only work
if the EntityType has no Features.)
sync (bool):
Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If features are created in this EntityType and force = False.
"""
if force:
self.delete_features()

super().delete(sync=sync)
_LOGGER.log_action_start_against_resource("Deleting", "", self)
lro = getattr(self.api_client, self._delete_method)(
name=self.resource_name, force=force
)
_LOGGER.log_action_started_against_resource_with_lro(
"Delete", "", self.__class__, lro
)
lro.result()
_LOGGER.log_action_completed_against_resource("deleted.", "", self)

@classmethod
@base.optional_sync()
Expand Down Expand Up @@ -615,33 +617,37 @@ def create_feature(
sync=sync,
)

def _validate_and_get_batch_create_features_requests(
def _validate_and_get_batch_create_features_request(
self,
feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]],
) -> List[Dict[str, Any]]:
""" Validates feature_configs and get batch_create_features_requests
) -> gca_featurestore_service.BatchCreateFeaturesRequest:
""" Validates feature_configs and get batch_create_features_request
Args:
feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]):
Required. A user defined Dict containing configurations for feature creation.
Returns:
List[Dict[str, Any]] - list of feature creation request
gca_featurestore_service.BatchCreateFeaturesRequest - batch feature creation request
"""

batch_create_features_requests = [
featurestore_utils._FeatureConfig(
requests = []
for feature_id, feature_config in feature_configs.items():
feature_config = featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=feature_config.get(
"value_type", featurestore_utils._FEATURE_VALUE_TYPE_UNSPECIFIED
),
description=feature_config.get("description", None),
labels=feature_config.get("labels", {}),
).request_dict
for feature_id, feature_config in feature_configs.items()
]
)
create_feature_request = feature_config.get_create_feature_request()
requests.append(create_feature_request)

return batch_create_features_requests
batch_create_features_request = gca_featurestore_service.BatchCreateFeaturesRequest(
parent=self.resource_name, requests=requests
)
return batch_create_features_request

@base.optional_sync(return_input_arg="self")
def batch_create_features(
Expand Down Expand Up @@ -710,7 +716,7 @@ def batch_create_features(
Returns:
EntityType - entity_type resource object
"""
batch_create_feature_requests = self._validate_and_get_batch_create_features_requests(
batch_create_features_request = self._validate_and_get_batch_create_features_request(
feature_configs=feature_configs
)

Expand All @@ -719,9 +725,7 @@ def batch_create_features(
)

batch_created_features_lro = self.api_client.batch_create_features(
parent=self.resource_name,
requests=batch_create_feature_requests,
metadata=request_metadata,
request=batch_create_features_request, metadata=request_metadata,
)

_LOGGER.log_action_started_against_resource_with_lro(
Expand All @@ -741,7 +745,7 @@ def batch_create_features(

def _validate_and_get_import_feature_values_request(
self,
feature_ids: Sequence[str],
feature_ids: List[str],
feature_source_fields: Optional[Dict[str, str]] = {},
avro_source: Optional[gca_io.AvroSource] = None,
bigquery_source: Optional[gca_io.BigQuerySource] = None,
Expand All @@ -754,7 +758,7 @@ def _validate_and_get_import_feature_values_request(
) -> Dict[str, Any]:
"""Validates and get import feature values request.
Args:
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -826,7 +830,7 @@ def _validate_and_get_import_feature_values_request(
ValueError if no feature_time_source or more than one feature_time_source is provided
"""
feature_specs = []
for feature_id in feature_ids:
for feature_id in set(feature_ids):
feature_source_field = feature_source_fields.get(feature_id, None)
if feature_source_field:
feature_spec = gca_featurestore_service.ImportFeatureValuesRequest.FeatureSpec(
Expand Down Expand Up @@ -925,7 +929,7 @@ def _import_feature_values(
def ingest_from_bq(
self,
bq_source_uri: str,
feature_ids: Sequence[str],
feature_ids: List[str],
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
Expand All @@ -945,7 +949,7 @@ def ingest_from_bq(
Required. BigQuery URI to the input table.
Example:
'bq://project.dataset.table_name'
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -1075,7 +1079,7 @@ def ingest_from_gcs(
self,
gcs_source_uris: Union[str, List[str]],
gcs_source_type: str,
feature_ids: Sequence[str],
feature_ids: List[str],
batch_create_feature_configs: Optional[
Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
] = None,
Expand All @@ -1091,19 +1095,19 @@ def ingest_from_gcs(
"""Ingest feature values from GCS.
Args:
gcs_source_uris (Sequence[str]):
gcs_source_uris (Union[str, List[str]]):
Required. Google Cloud Storage URI(-s) to the
input file(s). May contain wildcards. For more
information on wildcards, see
https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames.
Example:
["gs://my_bucket/my_file_1.csv", "gs://my_bucket/my_file_2.csv"]
or
["gs://my_bucket/my_file.avro"]
"gs://my_bucket/my_file.avro"
gcs_source_type (str):
Required. The type of the input file(s) provided by `gcs_source_uris`,
the value of gcs_source_type can only be either `csv`, or `avro`.
feature_ids (Sequence[str]):
feature_ids (List[str]):
Required. IDs of the Feature to import values
of. The Features must exist in the target
EntityType, or the request will fail.
Expand Down Expand Up @@ -1220,7 +1224,11 @@ def ingest_from_gcs(
gcs_source_type,
)
)

if isinstance(gcs_source_uris, str):
gcs_source_uris = [gcs_source_uris]
gcs_source = gca_io.GcsSource(uris=gcs_source_uris)

csv_source, avro_source = None, None
if gcs_source_type == "csv":
csv_source = gca_io.CsvSource(gcs_source=gcs_source)
Expand Down
9 changes: 5 additions & 4 deletions google/cloud/aiplatform/featurestore/feature.py
Expand Up @@ -569,14 +569,15 @@ def create(
location=location,
)

create_feature_request = featurestore_utils._FeatureConfig(
feature_config = featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=value_type,
description=description,
labels=labels,
).request_dict

create_feature_request["parent"] = entity_type_name
)
create_feature_request = feature_config.get_create_feature_request(
parent=entity_type_name
)

api_client = cls._instantiate_client(location=location, credentials=credentials)

Expand Down
44 changes: 23 additions & 21 deletions google/cloud/aiplatform/featurestore/featurestore.py
Expand Up @@ -302,15 +302,14 @@ def list_entity_types(

@base.optional_sync()
def delete_entity_types(
self, entity_type_ids: List[str] = None, force: bool = False, sync: bool = True,
self, entity_type_ids: List[str], force: bool = False, sync: bool = True,
) -> None:
"""Deletes entity_type resources in this Featurestore.
"""Deletes entity_type resources in this Featurestore given their entity_type IDs.
WARNING: This deletion is permanent.
Args:
entity_type_ids (List[str]):
Optional. The list of entity_type IDs to be deleted. If entity_type_ids is not set,
all entityTypes in this Featurestore will be deleted.
Required. The list of entity_type IDs to be deleted.
force (bool):
Optional. If force is set to True, all features in each entityType
will be deleted prior to entityType deletion. Default is False.
Expand All @@ -319,20 +318,16 @@ def delete_entity_types(
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
"""
if not entity_type_ids:
entity_types = self.list_entity_types()
elif entity_type_ids and isinstance(entity_type_ids, list):
entity_types = [
self.get_entity_type(entity_type_id=entity_type_id)
for entity_type_id in entity_type_ids
]

for entity_type in entity_types:
entity_types = []
for entity_type_id in entity_type_ids:
entity_type = self.get_entity_type(entity_type_id=entity_type_id)
entity_type.delete(force=force, sync=False)
entity_types.append(entity_type)

for entity_type in entity_types:
entity_type.wait()

@base.optional_sync()
def delete(self, force: bool = False, sync: bool = True) -> None:
"""Deletes this Featurestore resource. If force is set to True,
all entityTypes in this Featurestore will be deleted prior to featurestore deletion,
Expand All @@ -342,20 +337,27 @@ def delete(self, force: bool = False, sync: bool = True) -> None:
Args:
force (bool):
Required. If force is set to True, all entityTypes in this Featurestore will be
deleted prior to featurestore deletion, and all features in each entityType will
be deleted prior to each entityType deletion.
If set to true, any EntityTypes and
Features for this Featurestore will also
be deleted. (Otherwise, the request will
only work if the Featurestore has no
EntityTypes.)
sync (bool):
Required. Whether to execute this deletion synchronously. If False, this method
Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If entityTypes are created in this Featurestore and force = False.
"""
if force:
self.delete_entity_types(force=force)

super().delete(sync=sync)
_LOGGER.log_action_start_against_resource("Deleting", "", self)
lro = getattr(self.api_client, self._delete_method)(
name=self.resource_name, force=force
)
_LOGGER.log_action_started_against_resource_with_lro(
"Delete", "", self.__class__, lro
)
lro.result()
_LOGGER.log_action_completed_against_resource("deleted.", "", self)

@classmethod
@base.optional_sync()
Expand Down
26 changes: 17 additions & 9 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Expand Up @@ -17,12 +17,15 @@

import datetime
import re
from typing import Dict, NamedTuple, Optional, Tuple, Union
from typing import Dict, NamedTuple, Optional, Tuple

from google.protobuf import timestamp_pb2

from google.cloud.aiplatform.compat.services import featurestore_service_client
from google.cloud.aiplatform.compat.types import feature as gca_feature
from google.cloud.aiplatform.compat.types import (
feature as gca_feature,
featurestore_service as gca_featurestore_service,
)
from google.cloud.aiplatform import utils

CompatFeaturestoreServiceClient = featurestore_service_client.FeaturestoreServiceClient
Expand Down Expand Up @@ -197,9 +200,10 @@ def _get_value_type_enum(self) -> int:

return value_type_enum

@property
def request_dict(self) -> Dict[str, Union[int, str, Dict[str, str]]]:
"""Return request as a Dict."""
def get_create_feature_request(
self, parent: Optional[str] = None
) -> gca_featurestore_service.CreateFeatureRequest:
"""Return create feature request."""

if self.labels:
utils.validate_labels(self.labels)
Expand All @@ -210,10 +214,14 @@ def request_dict(self) -> Dict[str, Union[int, str, Dict[str, str]]]:
labels=self.labels,
)

create_feature_request = {
"feature": gapic_feature,
"feature_id": self._get_feature_id(),
}
if parent:
create_feature_request = gca_featurestore_service.CreateFeatureRequest(
parent=parent, feature=gapic_feature, feature_id=self._get_feature_id()
)
else:
create_feature_request = gca_featurestore_service.CreateFeatureRequest(
feature=gapic_feature, feature_id=self._get_feature_id()
)

return create_feature_request

Expand Down

0 comments on commit 9b1b72a

Please sign in to comment.