Skip to content

Commit

Permalink
feat: enable force on resource deletion
Browse files Browse the repository at this point in the history
  • Loading branch information
morgandu committed Dec 2, 2021
1 parent 8e83d09 commit 8af8486
Show file tree
Hide file tree
Showing 7 changed files with 232 additions and 139 deletions.
91 changes: 73 additions & 18 deletions google/cloud/aiplatform/featurestore/entity_type.py
Expand Up @@ -16,7 +16,7 @@
#

import datetime
from typing import Dict, List, Optional, Sequence, Tuple, Union
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

from google.auth import credentials as auth_credentials
from google.protobuf import field_mask_pb2
Expand Down Expand Up @@ -366,27 +366,54 @@ def list_features(
)

@base.optional_sync()
def delete_features(self, feature_ids: List[str], sync: bool = True,) -> None:
"""Deletes feature resources in this EntityType given their feature IDs.
def delete_features(self, feature_ids: List[str] = None, sync: bool = True) -> None:
"""Deletes feature resources in this EntityType.
WARNING: This deletion is permanent.
Args:
feature_ids (List[str]):
Required. The list of feature IDs to be deleted.
Optional. The list of feature IDs to be deleted. If feature_ids is not set,
all features in this EntityType will be deleted.
sync (bool):
Optional. Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
"""
features = []
for feature_id in feature_ids:
feature = self.get_feature(feature_id=feature_id)
if not feature_ids:
features = self.list_features()
elif feature_ids and isinstance(feature_ids, list):
features = [
self.get_feature(feature_id=feature_id) for feature_id in feature_ids
]

for feature in features:
feature.delete(sync=False)
features.append(feature)

for feature in features:
feature.wait()

def delete(self, force: bool = False, sync: bool = True) -> None:
"""Deletes this EntityType resource. If force is set to True,
all features in this EntityType will be deleted prior to entityType deletion.
WARNING: This deletion is permanent.
Args:
force (bool):
Required. If force is set to True, all features in this EntityType will be
deleted prior to entityType deletion.
sync (bool):
Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If features are created in this EntityType and force = False.
"""
if force:
self.delete_features()

super().delete(sync=sync)

@classmethod
@base.optional_sync()
def create(
Expand All @@ -399,7 +426,7 @@ def create(
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "EntityType":
"""Creates an EntityType resource in a Featurestore.
Expand Down Expand Up @@ -524,7 +551,7 @@ def create_feature(
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "featurestore.Feature":
"""Creates a Feature resource in this EntityType.
Expand Down Expand Up @@ -588,12 +615,40 @@ def create_feature(
sync=sync,
)

def _validate_and_get_batch_create_features_requests(
self,
feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]],
) -> List[Dict[str, Any]]:
""" Validates feature_configs and get batch_create_features_requests
Args:
feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]):
Required. A user defined Dict containing configurations for feature creation.
Returns:
List[Dict[str, Any]] - list of feature creation request
"""

batch_create_features_requests = [
featurestore_utils._FeatureConfig(
feature_id=feature_id,
value_type=feature_config.get(
"value_type", featurestore_utils._FEATURE_VALUE_TYPE_UNSPECIFIED
),
description=feature_config.get("description", None),
labels=feature_config.get("labels", {}),
).request_dict
for feature_id, feature_config in feature_configs.items()
]

return batch_create_features_requests

@base.optional_sync(return_input_arg="self")
def batch_create_features(
self,
feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]],
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "EntityType":
"""Batch creates Feature resources in this EntityType.
Expand Down Expand Up @@ -655,7 +710,7 @@ def batch_create_features(
Returns:
EntityType - entity_type resource object
"""
batch_create_feature_requests = featurestore_utils.validate_and_get_batch_create_features_requests(
batch_create_feature_requests = self._validate_and_get_batch_create_features_requests(
feature_configs=feature_configs
)

Expand Down Expand Up @@ -696,7 +751,7 @@ def _validate_and_get_import_feature_values_request(
entity_id_field: Optional[str] = None,
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
):
) -> Dict[str, Any]:
"""Validates and get import feature values request.
Args:
feature_ids (Sequence[str]):
Expand Down Expand Up @@ -765,7 +820,7 @@ def _validate_and_get_import_feature_values_request(
count ensures minimal impact on online serving
performance.
Returns:
dict - import feature values request
Dict[str, Any] - import feature values request
Raises:
ValueError if no source or more than one source is provided
ValueError if no feature_time_source or more than one feature_time_source is provided
Expand Down Expand Up @@ -830,8 +885,8 @@ def _import_feature_values(
self,
import_feature_values_request: dict,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
):
sync: bool = True,
) -> "EntityType":
"""Imports Feature values into the Featurestore from a source storage.
Args:
Expand Down Expand Up @@ -881,7 +936,7 @@ def ingest_from_bq(
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "EntityType":
"""Ingest feature values from BigQuery.
Expand Down Expand Up @@ -1031,7 +1086,7 @@ def ingest_from_gcs(
disable_online_serving: Optional[bool] = None,
worker_count: Optional[int] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "EntityType":
"""Ingest feature values from GCS.
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/aiplatform/featurestore/feature.py
Expand Up @@ -476,7 +476,7 @@ def create(
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "Feature":
"""Creates a Feature resource in an EntityType.
Expand Down
53 changes: 43 additions & 10 deletions google/cloud/aiplatform/featurestore/featurestore.py
Expand Up @@ -302,28 +302,61 @@ def list_entity_types(

@base.optional_sync()
def delete_entity_types(
self, entity_type_ids: List[str], sync: bool = True,
self, entity_type_ids: List[str] = None, force: bool = False, sync: bool = True,
) -> None:
"""Deletes entity_type resources in this Featurestore given their entity_type IDs.
"""Deletes entity_type resources in this Featurestore.
WARNING: This deletion is permanent.
Args:
entity_type_ids (List[str]):
Required. The list of entity_type IDs to be deleted.
Optional. The list of entity_type IDs to be deleted. If entity_type_ids is not set,
all entityTypes in this Featurestore will be deleted.
force (bool):
Optional. If force is set to True, all features in each entityType
will be deleted prior to entityType deletion. Default is False.
sync (bool):
Optional. Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
"""
entity_types = []
for entity_type_id in entity_type_ids:
entity_type = self.get_entity_type(entity_type_id=entity_type_id)
entity_type.delete(sync=False)
entity_types.append(entity_type)
if not entity_type_ids:
entity_types = self.list_entity_types()
elif entity_type_ids and isinstance(entity_type_ids, list):
entity_types = [
self.get_entity_type(entity_type_id=entity_type_id)
for entity_type_id in entity_type_ids
]

for entity_type in entity_types:
entity_type.delete(force=force, sync=False)

for entity_type in entity_types:
entity_type.wait()

def delete(self, force: bool = False, sync: bool = True) -> None:
"""Deletes this Featurestore resource. If force is set to True,
all entityTypes in this Featurestore will be deleted prior to featurestore deletion,
and all features in each entityType will be deleted prior to each entityType deletion.
WARNING: This deletion is permanent.
Args:
force (bool):
Required. If force is set to True, all entityTypes in this Featurestore will be
deleted prior to featurestore deletion, and all features in each entityType will
be deleted prior to each entityType deletion.
sync (bool):
Required. Whether to execute this deletion synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Raises:
FailedPrecondition: If entityTypes are created in this Featurestore and force = False.
"""
if force:
self.delete_entity_types(force=force)

super().delete(sync=sync)

@classmethod
@base.optional_sync()
def create(
Expand All @@ -336,7 +369,7 @@ def create(
credentials: Optional[auth_credentials.Credentials] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
encryption_spec_key_name: Optional[str] = None,
sync: Optional[bool] = True,
sync: bool = True,
) -> "Featurestore":
"""Creates a Featurestore resource.
Expand Down Expand Up @@ -453,7 +486,7 @@ def create_entity_type(
description: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
sync: Optional[bool] = True,
sync: bool = True,
) -> "featurestore.EntityType":
"""Creates an EntityType resource in this Featurestore.
Expand Down
30 changes: 1 addition & 29 deletions google/cloud/aiplatform/utils/featurestore_utils.py
Expand Up @@ -17,7 +17,7 @@

import datetime
import re
from typing import Dict, List, NamedTuple, Optional, Tuple, Union
from typing import Dict, NamedTuple, Optional, Tuple, Union

from google.protobuf import timestamp_pb2

Expand Down Expand Up @@ -149,34 +149,6 @@ def validate_value_type(value_type: str) -> bool:
return True


def validate_and_get_batch_create_features_requests(
feature_configs: Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]
) -> List[Dict[str, Union[int, str, Dict[str, str]]]]:
""" Validates feature_configs and get batch_create_features_requests
Args:
feature_configs (Dict[str, Dict[str, Union[bool, int, Dict[str, str], str]]]):
Required. A user defined Dict containing configurations for feature creation.
Returns:
List[Dict[str, Union[int, str, Dict[str, str]]]] - list of feature creation request
"""

batch_create_features_requests = [
_FeatureConfig(
feature_id=feature_id,
value_type=feature_config.get(
"value_type", _FEATURE_VALUE_TYPE_UNSPECIFIED
),
description=feature_config.get("description", None),
labels=feature_config.get("labels", {}),
).request_dict
for feature_id, feature_config in feature_configs.items()
]

return batch_create_features_requests


class _FeatureConfig(NamedTuple):
"""Configuration for feature creation.
Expand Down
4 changes: 3 additions & 1 deletion tests/system/aiplatform/e2e_base.py
Expand Up @@ -104,7 +104,9 @@ def teardown(self, shared_state: Dict[str, Any]):

for resource in shared_state["resources"]:
try:
if isinstance(resource, aiplatform.Endpoint):
if isinstance(resource, aiplatform.Endpoint) or isinstance(
resource, aiplatform.Featurestore
):
resource.delete(force=True) # Undeploy model then delete endpoint
else:
resource.delete()
Expand Down

0 comments on commit 8af8486

Please sign in to comment.