Skip to content

Commit

Permalink
feat: Add data plane code snippets for feature store service (#713)
Browse files Browse the repository at this point in the history
* feat: Add data plane code snippets for feature store service
  • Loading branch information
lclc19 committed Sep 24, 2021
1 parent 8e06ced commit e3ea683
Show file tree
Hide file tree
Showing 13 changed files with 583 additions and 8 deletions.
44 changes: 40 additions & 4 deletions samples/snippets/conftest.py
Expand Up @@ -16,6 +16,7 @@
from uuid import uuid4

from google.cloud import aiplatform, aiplatform_v1beta1
from google.cloud import bigquery
from google.cloud import storage
import pytest

Expand Down Expand Up @@ -91,6 +92,14 @@ def featurestore_client():
yield featurestore_client


@pytest.fixture
def bigquery_client():
bigquery_client = bigquery.Client(
project=os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")
)
yield bigquery_client


# Shared setup/teardown.
@pytest.fixture()
def teardown_batch_prediction_job(shared_state, job_client):
Expand Down Expand Up @@ -213,16 +222,24 @@ def teardown_dataset(shared_state, dataset_client):
def teardown_featurestore(shared_state, featurestore_client):
yield

# Delete the created featurestore
featurestore_client.delete_featurestore(name=shared_state["featurestore_name"])
# Force delete the created featurestore
force_delete_featurestore_request = {
"name": shared_state["featurestore_name"],
"force": True,
}
featurestore_client.delete_featurestore(request=force_delete_featurestore_request)


@pytest.fixture()
def teardown_entity_type(shared_state, featurestore_client):
yield

# Delete the created entity type
featurestore_client.delete_entity_type(name=shared_state["entity_type_name"])
# Force delete the created entity type
force_delete_entity_type_request = {
"name": shared_state["entity_type_name"],
"force": True,
}
featurestore_client.delete_entity_type(request=force_delete_entity_type_request)


@pytest.fixture()
Expand All @@ -233,6 +250,25 @@ def teardown_feature(shared_state, featurestore_client):
featurestore_client.delete_feature(name=shared_state["feature_name"])


@pytest.fixture()
def teardown_features(shared_state, featurestore_client):
yield

# Delete the created features
for feature_name in shared_state["feature_names"]:
featurestore_client.delete_feature(name=feature_name)


@pytest.fixture()
def teardown_batch_read_feature_values(shared_state, bigquery_client):
yield

# Delete the created dataset
bigquery_client.delete_dataset(
shared_state["destination_data_set"], delete_contents=True, not_found_ok=True
)


@pytest.fixture()
def create_endpoint(shared_state, endpoint_client):
def create(project, location, test_name="temp_deploy_model_test"):
Expand Down
@@ -0,0 +1,74 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Create features in bulk for an existing type.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet

# [START aiplatform_batch_create_features_sample]
from google.cloud import aiplatform_v1beta1 as aiplatform


def batch_create_features_sample(
project: str,
featurestore_id: str,
entity_type_id: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
parent = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
age_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.INT64, description="User age",
)
age_feature_request = aiplatform.CreateFeatureRequest(
feature=age_feature, feature_id="age"
)

gender_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.STRING, description="User gender"
)
gender_feature_request = aiplatform.CreateFeatureRequest(
feature=gender_feature, feature_id="gender"
)

liked_genres_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.STRING_ARRAY,
description="An array of genres that this user liked",
)
liked_genres_feature_request = aiplatform.CreateFeatureRequest(
feature=liked_genres_feature, feature_id="liked_genres"
)

requests = [
age_feature_request,
gender_feature_request,
liked_genres_feature_request,
]
batch_create_features_request = aiplatform.BatchCreateFeaturesRequest(
parent=parent, requests=requests
)
lro_response = client.batch_create_features(request=batch_create_features_request)
print("Long running operation:", lro_response.operation.name)
batch_create_features_response = lro_response.result(timeout=timeout)
print("batch_create_features_response:", batch_create_features_response)


# [END aiplatform_batch_create_features_sample]
@@ -0,0 +1,58 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from uuid import uuid4

import batch_create_features_sample
import create_entity_type_sample

import pytest

import helpers

PROJECT_ID = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")


@pytest.fixture(scope="function", autouse=True)
def teardown(teardown_entity_type):
yield


def setup_temp_entity_type(featurestore_id, entity_type_id, capsys):
create_entity_type_sample.create_entity_type_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
entity_type_id=entity_type_id,
)
out, _ = capsys.readouterr()
assert "create_entity_type_response" in out
return helpers.get_featurestore_resource_name(out)


def test_ucaip_generated_batch_create_features_sample_vision(capsys, shared_state):
featurestore_id = "perm_sample_featurestore"
entity_type_id = f"users_{uuid4()}".replace("-", "_")[:60]
entity_type_name = setup_temp_entity_type(featurestore_id, entity_type_id, capsys)
location = "us-central1"
batch_create_features_sample.batch_create_features_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
entity_type_id=entity_type_id,
location=location,
)
out, _ = capsys.readouterr()
assert "batch_create_features_response" in out

shared_state["entity_type_name"] = entity_type_name
@@ -0,0 +1,86 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Batch read feature values from a featurestore, as determined by your read
# instances list file, to export data.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet

# [START aiplatform_batch_read_feature_values_sample]
from google.cloud import aiplatform_v1beta1 as aiplatform


def batch_read_feature_values_sample(
project: str,
featurestore_id: str,
input_csv_file: str,
destination_table_uri: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
featurestore = (
f"projects/{project}/locations/{location}/featurestores/{featurestore_id}"
)
csv_read_instances = aiplatform.CsvSource(
gcs_source=aiplatform.GcsSource(uris=[input_csv_file])
)
destination = aiplatform.FeatureValueDestination(
bigquery_destination=aiplatform.BigQueryDestination(
# Output to BigQuery table created earlier
output_uri=destination_table_uri
)
)

users_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["age", "gender", "liked_genres"])
)
users_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the 'age', 'gender' and 'liked_genres' features from the 'perm_users' entity
entity_type_id="perm_users",
feature_selector=users_feature_selector,
)

movies_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["*"])
)
movies_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the all features from the 'perm_movies' entity
entity_type_id="perm_movies",
feature_selector=movies_feature_selector,
)

entity_type_specs = [users_entity_type_spec, movies_entity_type_spec]
# Batch serving request from CSV
batch_read_feature_values_request = aiplatform.BatchReadFeatureValuesRequest(
featurestore=featurestore,
csv_read_instances=csv_read_instances,
destination=destination,
entity_type_specs=entity_type_specs,
)
lro_response = client.batch_read_feature_values(
request=batch_read_feature_values_request
)
print("Long running operation:", lro_response.operation.name)
batch_read_feature_values_response = lro_response.result(timeout=timeout)
print("batch_read_feature_values_response:", batch_read_feature_values_response)


# [END aiplatform_batch_read_feature_values_sample]
@@ -0,0 +1,70 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import os

import batch_read_feature_values_sample
from google.cloud import bigquery

import pytest

PROJECT_ID = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")
LOCATION = "us-central1"
INPUT_CSV_FILE = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction_perm.csv"


@pytest.fixture(scope="function", autouse=True)
def teardown(teardown_batch_read_feature_values):
yield


def setup_test():
# Output dataset
destination_data_set = "movie_predictions_" + datetime.now().strftime(
"%Y%m%d%H%M%S"
)
# Output table. Make sure that the table does NOT already exist, the
# BatchReadFeatureValues API cannot overwrite an existing table.
destination_table_name = "training_data"
DESTINATION_PATTERN = "bq://{project}.{dataset}.{table}"
destination_table_uri = DESTINATION_PATTERN.format(
project=PROJECT_ID, dataset=destination_data_set, table=destination_table_name
)
# Create dataset
bq_client = bigquery.Client(project=PROJECT_ID)
dataset_id = "{}.{}".format(bq_client.project, destination_data_set)
dataset = bigquery.Dataset(dataset_id)
dataset.location = LOCATION
dataset = bq_client.create_dataset(dataset)
print("Created dataset {}.{}".format(bq_client.project, dataset.dataset_id))
return destination_data_set, destination_table_uri


def test_ucaip_generated_batch_read_feature_values_sample_vision(capsys, shared_state):
destination_data_set, destination_table_uri = setup_test()
featurestore_id = "perm_sample_featurestore"

batch_read_feature_values_sample.batch_read_feature_values_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
input_csv_file=INPUT_CSV_FILE,
destination_table_uri=destination_table_uri,
)
out, _ = capsys.readouterr()
assert "batch_read_feature_values_response" in out
with capsys.disabled():
print(out)

shared_state["destination_data_set"] = destination_data_set
Expand Up @@ -12,6 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# Create an entity type so that you can create its related features.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet

# [START aiplatform_create_entity_type_sample]
from google.cloud import aiplatform_v1beta1 as aiplatform

Expand All @@ -25,7 +29,8 @@ def create_entity_type_sample(
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints.
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
Expand Down

0 comments on commit e3ea683

Please sign in to comment.