Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dataplane code snippets for feature store service #713

Merged
merged 9 commits into from Sep 24, 2021
70 changes: 69 additions & 1 deletion samples/snippets/conftest.py
Expand Up @@ -15,7 +15,8 @@
import os
from uuid import uuid4

from google.cloud import aiplatform
from google.cloud import aiplatform, aiplatform_v1beta1
nicain marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud import bigquery
from google.cloud import storage
import pytest

Expand Down Expand Up @@ -83,6 +84,22 @@ def dataset_client():
yield dataset_client


@pytest.fixture
def featurestore_client():
featurestore_client = aiplatform_v1beta1.FeaturestoreServiceClient(
client_options={"api_endpoint": "us-central1-aiplatform.googleapis.com"}
)
yield featurestore_client


@pytest.fixture
def bigquery_client():
bigquery_client = bigquery.Client(
project=os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")
)
yield bigquery_client


# Shared setup/teardown.
@pytest.fixture()
def teardown_batch_prediction_job(shared_state, job_client):
Expand Down Expand Up @@ -201,6 +218,57 @@ def teardown_dataset(shared_state, dataset_client):
dataset_client.delete_dataset(name=shared_state["dataset_name"])


@pytest.fixture()
def teardown_featurestore(shared_state, featurestore_client):
yield

# Force delete the created featurestore
force_delete_featurestore_request = {
"name": shared_state["featurestore_name"],
"force": True,
}
featurestore_client.delete_featurestore(request=force_delete_featurestore_request)


@pytest.fixture()
def teardown_entity_type(shared_state, featurestore_client):
yield

# Force delete the created entity type
force_delete_entity_type_request = {
"name": shared_state["entity_type_name"],
"force": True,
}
featurestore_client.delete_entity_type(request=force_delete_entity_type_request)


@pytest.fixture()
def teardown_feature(shared_state, featurestore_client):
yield

# Delete the created feature
featurestore_client.delete_feature(name=shared_state["feature_name"])


@pytest.fixture()
def teardown_features(shared_state, featurestore_client):
yield

# Delete the created features
for feature_name in shared_state["feature_names"]:
featurestore_client.delete_feature(name=feature_name)


@pytest.fixture()
def teardown_batch_read_feature_values(shared_state, bigquery_client):
yield

# Delete the created dataset
bigquery_client.delete_dataset(
shared_state["destination_data_set"], delete_contents=True, not_found_ok=True
)


@pytest.fixture()
def create_endpoint(shared_state, endpoint_client):
def create(project, location, test_name="temp_deploy_model_test"):
Expand Down
@@ -0,0 +1,74 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

nicain marked this conversation as resolved.
Show resolved Hide resolved
# Create features in bulk for an existing type.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet

# [START aiplatform_batch_create_features_sample]
from google.cloud import aiplatform_v1beta1 as aiplatform


def batch_create_features_sample(
project: str,
featurestore_id: str,
entity_type_id: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
parent = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
age_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.INT64, description="User age",
)
age_feature_request = aiplatform.CreateFeatureRequest(
feature=age_feature, feature_id="age"
)

gender_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.STRING, description="User gender"
)
gender_feature_request = aiplatform.CreateFeatureRequest(
feature=gender_feature, feature_id="gender"
)

liked_genres_feature = aiplatform.Feature(
value_type=aiplatform.Feature.ValueType.STRING_ARRAY,
description="An array of genres that this user liked",
)
liked_genres_feature_request = aiplatform.CreateFeatureRequest(
feature=liked_genres_feature, feature_id="liked_genres"
)

requests = [
nicain marked this conversation as resolved.
Show resolved Hide resolved
age_feature_request,
gender_feature_request,
liked_genres_feature_request,
]
batch_create_features_request = aiplatform.BatchCreateFeaturesRequest(
parent=parent, requests=requests
)
lro_response = client.batch_create_features(request=batch_create_features_request)
lclc19 marked this conversation as resolved.
Show resolved Hide resolved
print("Long running operation:", lro_response.operation.name)
lclc19 marked this conversation as resolved.
Show resolved Hide resolved
batch_create_features_response = lro_response.result(timeout=timeout)
print("batch_create_features_response:", batch_create_features_response)


# [END aiplatform_batch_create_features_sample]
@@ -0,0 +1,58 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
from uuid import uuid4

import batch_create_features_sample
import create_entity_type_sample

import pytest

import helpers

PROJECT_ID = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")


@pytest.fixture(scope="function", autouse=True)
def teardown(teardown_entity_type):
yield


def setup_temp_entity_type(featurestore_id, entity_type_id, capsys):
create_entity_type_sample.create_entity_type_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
entity_type_id=entity_type_id,
)
out, _ = capsys.readouterr()
assert "create_entity_type_response" in out
return helpers.get_featurestore_resource_name(out)


def test_ucaip_generated_batch_create_features_sample_vision(capsys, shared_state):
featurestore_id = "perm_sample_featurestore"
entity_type_id = f"users_{uuid4()}".replace("-", "_")[:60]
entity_type_name = setup_temp_entity_type(featurestore_id, entity_type_id, capsys)
location = "us-central1"
batch_create_features_sample.batch_create_features_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
entity_type_id=entity_type_id,
location=location,
)
out, _ = capsys.readouterr()
assert "batch_create_features_response" in out

shared_state["entity_type_name"] = entity_type_name
@@ -0,0 +1,86 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Batch read feature values from a featurestore, as determined by your read
# instances list file, to export data.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet

# [START aiplatform_batch_read_feature_values_sample]
nicain marked this conversation as resolved.
Show resolved Hide resolved
from google.cloud import aiplatform_v1beta1 as aiplatform


def batch_read_feature_values_sample(
project: str,
featurestore_id: str,
input_csv_file: str,
destination_table_uri: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
featurestore = (
f"projects/{project}/locations/{location}/featurestores/{featurestore_id}"
)
lclc19 marked this conversation as resolved.
Show resolved Hide resolved
csv_read_instances = aiplatform.CsvSource(
gcs_source=aiplatform.GcsSource(uris=[input_csv_file])
)
destination = aiplatform.FeatureValueDestination(
bigquery_destination=aiplatform.BigQueryDestination(
# Output to BigQuery table created earlier
output_uri=destination_table_uri
)
)

users_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["age", "gender", "liked_genres"])
)
users_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the 'age', 'gender' and 'liked_genres' features from the 'perm_users' entity
entity_type_id="perm_users",
feature_selector=users_feature_selector,
)

movies_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["*"])
)
movies_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the all features from the 'perm_movies' entity
entity_type_id="perm_movies",
feature_selector=movies_feature_selector,
)

entity_type_specs = [users_entity_type_spec, movies_entity_type_spec]
# Batch serving request from CSV
batch_read_feature_values_request = aiplatform.BatchReadFeatureValuesRequest(
featurestore=featurestore,
csv_read_instances=csv_read_instances,
destination=destination,
entity_type_specs=entity_type_specs,
)
lro_response = client.batch_read_feature_values(
request=batch_read_feature_values_request
)
print("Long running operation:", lro_response.operation.name)
batch_read_feature_values_response = lro_response.result(timeout=timeout)
print("batch_read_feature_values_response:", batch_read_feature_values_response)


# [END aiplatform_batch_read_feature_values_sample]
@@ -0,0 +1,70 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from datetime import datetime
import os

import batch_read_feature_values_sample
from google.cloud import bigquery

import pytest

PROJECT_ID = os.getenv("BUILD_SPECIFIC_GCLOUD_PROJECT")
LOCATION = "us-central1"
INPUT_CSV_FILE = "gs://cloud-samples-data-us-central1/vertex-ai/feature-store/datasets/movie_prediction_perm.csv"


@pytest.fixture(scope="function", autouse=True)
def teardown(teardown_batch_read_feature_values):
yield


def setup_test():
# Output dataset
destination_data_set = "movie_predictions_" + datetime.now().strftime(
"%Y%m%d%H%M%S"
)
# Output table. Make sure that the table does NOT already exist, the
# BatchReadFeatureValues API cannot overwrite an existing table.
destination_table_name = "training_data"
DESTINATION_PATTERN = "bq://{project}.{dataset}.{table}"
destination_table_uri = DESTINATION_PATTERN.format(
project=PROJECT_ID, dataset=destination_data_set, table=destination_table_name
)
# Create dataset
bq_client = bigquery.Client(project=PROJECT_ID)
dataset_id = "{}.{}".format(bq_client.project, destination_data_set)
dataset = bigquery.Dataset(dataset_id)
dataset.location = LOCATION
dataset = bq_client.create_dataset(dataset)
print("Created dataset {}.{}".format(bq_client.project, dataset.dataset_id))
return destination_data_set, destination_table_uri


def test_ucaip_generated_batch_read_feature_values_sample_vision(capsys, shared_state):
destination_data_set, destination_table_uri = setup_test()
featurestore_id = "perm_sample_featurestore"

batch_read_feature_values_sample.batch_read_feature_values_sample(
project=PROJECT_ID,
featurestore_id=featurestore_id,
input_csv_file=INPUT_CSV_FILE,
destination_table_uri=destination_table_uri,
)
out, _ = capsys.readouterr()
assert "batch_read_feature_values_response" in out
with capsys.disabled():
print(out)

shared_state["destination_data_set"] = destination_data_set