/
batch_read_feature_values_sample.py
86 lines (77 loc) · 3.52 KB
/
batch_read_feature_values_sample.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Batch read feature values from a featurestore, as determined by your read
# instances list file, to export data.
# See https://cloud.google.com/vertex-ai/docs/featurestore/setup before running
# the code snippet
# [START aiplatform_batch_read_feature_values_sample]
from google.cloud import aiplatform_v1beta1 as aiplatform
def batch_read_feature_values_sample(
project: str,
featurestore_id: str,
input_csv_file: str,
destination_table_uri: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 300,
):
# The AI Platform services require regional API endpoints, which need to be
# in the same region or multi-region overlap with the Feature Store location.
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.FeaturestoreServiceClient(client_options=client_options)
featurestore = (
f"projects/{project}/locations/{location}/featurestores/{featurestore_id}"
)
csv_read_instances = aiplatform.CsvSource(
gcs_source=aiplatform.GcsSource(uris=[input_csv_file])
)
destination = aiplatform.FeatureValueDestination(
bigquery_destination=aiplatform.BigQueryDestination(
# Output to BigQuery table created earlier
output_uri=destination_table_uri
)
)
users_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["age", "gender", "liked_genres"])
)
users_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the 'age', 'gender' and 'liked_genres' features from the 'perm_users' entity
entity_type_id="perm_users",
feature_selector=users_feature_selector,
)
movies_feature_selector = aiplatform.FeatureSelector(
id_matcher=aiplatform.IdMatcher(ids=["*"])
)
movies_entity_type_spec = aiplatform.BatchReadFeatureValuesRequest.EntityTypeSpec(
# Read the all features from the 'perm_movies' entity
entity_type_id="perm_movies",
feature_selector=movies_feature_selector,
)
entity_type_specs = [users_entity_type_spec, movies_entity_type_spec]
# Batch serving request from CSV
batch_read_feature_values_request = aiplatform.BatchReadFeatureValuesRequest(
featurestore=featurestore,
csv_read_instances=csv_read_instances,
destination=destination,
entity_type_specs=entity_type_specs,
)
lro_response = client.batch_read_feature_values(
request=batch_read_feature_values_request
)
print("Long running operation:", lro_response.operation.name)
batch_read_feature_values_response = lro_response.result(timeout=timeout)
print("batch_read_feature_values_response:", batch_read_feature_values_response)
# [END aiplatform_batch_read_feature_values_sample]