forked from googleapis/python-aiplatform
-
Notifications
You must be signed in to change notification settings - Fork 0
/
time_series_dataset.py
135 lines (116 loc) · 5.45 KB
/
time_series_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
# -*- coding: utf-8 -*-
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from typing import Optional, Sequence, Tuple, Union
from google.auth import credentials as auth_credentials
from google.cloud.aiplatform import datasets
from google.cloud.aiplatform.datasets import _datasources
from google.cloud.aiplatform import initializer
from google.cloud.aiplatform import schema
from google.cloud.aiplatform import utils
# TODO: extend tabular dataset
class TimeSeriesDataset(datasets._Dataset):
"""Managed time series dataset resource for Vertex AI"""
_supported_metadata_schema_uris: Optional[Tuple[str]] = (
schema.dataset.metadata.time_series,
)
@classmethod
def create(
cls,
display_name: str,
gcs_source: Optional[Union[str, Sequence[str]]] = None,
bq_source: Optional[str] = None,
project: Optional[str] = None,
location: Optional[str] = None,
credentials: Optional[auth_credentials.Credentials] = None,
request_metadata: Optional[Sequence[Tuple[str, str]]] = (),
encryption_spec_key_name: Optional[str] = None,
sync: bool = True,
) -> "TimeSeriesDataset":
"""Creates a new time series dataset.
Args:
display_name (str):
Required. The user-defined name of the Dataset.
The name can be up to 128 characters long and can be consist
of any UTF-8 characters.
gcs_source (Union[str, Sequence[str]]):
Google Cloud Storage URI(-s) to the
input file(s). May contain wildcards. For more
information on wildcards, see
https://cloud.google.com/storage/docs/gsutil/addlhelp/WildcardNames.
examples:
str: "gs://bucket/file.csv"
Sequence[str]: ["gs://bucket/file1.csv", "gs://bucket/file2.csv"]
bq_source (str):
BigQuery URI to the input table.
example:
"bq://project.dataset.table_name"
project (str):
Project to upload this model to. Overrides project set in
aiplatform.init.
location (str):
Location to upload this model to. Overrides location set in
aiplatform.init.
credentials (auth_credentials.Credentials):
Custom credentials to use to upload this model. Overrides
credentials set in aiplatform.init.
request_metadata (Sequence[Tuple[str, str]]):
Strings which should be sent along with the request as metadata.
encryption_spec_key_name (Optional[str]):
Optional. The Cloud KMS resource identifier of the customer
managed encryption key used to protect the dataset. Has the
form:
``projects/my-project/locations/my-region/keyRings/my-kr/cryptoKeys/my-key``.
The key needs to be in the same region as where the compute
resource is created.
If set, this Dataset and all sub-resources of this Dataset will be secured by this key.
Overrides encryption_spec_key_name set in aiplatform.init.
sync (bool):
Whether to execute this method synchronously. If False, this method
will be executed in concurrent Future and any downstream object will
be immediately returned and synced when the Future has completed.
Returns:
time_series_dataset (TimeSeriesDataset):
Instantiated representation of the managed time series dataset resource.
"""
utils.validate_display_name(display_name)
api_client = cls._instantiate_client(location=location, credentials=credentials)
metadata_schema_uri = schema.dataset.metadata.time_series
datasource = _datasources.create_datasource(
metadata_schema_uri=metadata_schema_uri,
gcs_source=gcs_source,
bq_source=bq_source,
)
return cls._create_and_import(
api_client=api_client,
parent=initializer.global_config.common_location_path(
project=project, location=location
),
display_name=display_name,
metadata_schema_uri=metadata_schema_uri,
datasource=datasource,
project=project or initializer.global_config.project,
location=location or initializer.global_config.location,
credentials=credentials or initializer.global_config.credentials,
request_metadata=request_metadata,
encryption_spec=initializer.global_config.get_encryption_spec(
encryption_spec_key_name=encryption_spec_key_name
),
sync=sync,
)
def import_data(self):
raise NotImplementedError(
f"{self.__class__.__name__} class does not support 'import_data'"
)