/
schema.py
486 lines (389 loc) · 16.5 KB
/
schema.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
# Copyright 2015 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Schemas for BigQuery tables / queries."""
import collections
from typing import Optional
from google.cloud.bigquery_v2 import types
_DEFAULT_VALUE = object()
_STRUCT_TYPES = ("RECORD", "STRUCT")
# SQL types reference:
# https://cloud.google.com/bigquery/data-types#legacy_sql_data_types
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
LEGACY_TO_STANDARD_TYPES = {
"STRING": types.StandardSqlDataType.TypeKind.STRING,
"BYTES": types.StandardSqlDataType.TypeKind.BYTES,
"INTEGER": types.StandardSqlDataType.TypeKind.INT64,
"INT64": types.StandardSqlDataType.TypeKind.INT64,
"FLOAT": types.StandardSqlDataType.TypeKind.FLOAT64,
"FLOAT64": types.StandardSqlDataType.TypeKind.FLOAT64,
"NUMERIC": types.StandardSqlDataType.TypeKind.NUMERIC,
"BIGNUMERIC": types.StandardSqlDataType.TypeKind.BIGNUMERIC,
"BOOLEAN": types.StandardSqlDataType.TypeKind.BOOL,
"BOOL": types.StandardSqlDataType.TypeKind.BOOL,
"GEOGRAPHY": types.StandardSqlDataType.TypeKind.GEOGRAPHY,
"RECORD": types.StandardSqlDataType.TypeKind.STRUCT,
"STRUCT": types.StandardSqlDataType.TypeKind.STRUCT,
"TIMESTAMP": types.StandardSqlDataType.TypeKind.TIMESTAMP,
"DATE": types.StandardSqlDataType.TypeKind.DATE,
"TIME": types.StandardSqlDataType.TypeKind.TIME,
"DATETIME": types.StandardSqlDataType.TypeKind.DATETIME,
# no direct conversion from ARRAY, the latter is represented by mode="REPEATED"
}
"""String names of the legacy SQL types to integer codes of Standard SQL types."""
class SchemaField(object):
"""Describe a single field within a table schema.
Args:
name (str): The name of the field.
field_type (str): The type of the field. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
mode (Optional[str]): The mode of the field. See
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.mode
description (Optional[str]): Description for the field.
fields (Optional[Tuple[google.cloud.bigquery.schema.SchemaField]]):
Subfields (requires ``field_type`` of 'RECORD').
policy_tags (Optional[PolicyTagList]): The policy tag list for the field.
precision (Optional[int]):
Precison (number of digits) of fields with NUMERIC or BIGNUMERIC type.
scale (Optional[int]):
Scale (digits after decimal) of fields with NUMERIC or BIGNUMERIC type.
max_length (Optional[int]):
Maximim length of fields with STRING or BYTES type.
"""
def __init__(
self,
name,
field_type,
mode="NULLABLE",
description=_DEFAULT_VALUE,
fields=(),
policy_tags=None,
precision=_DEFAULT_VALUE,
scale=_DEFAULT_VALUE,
max_length=_DEFAULT_VALUE,
):
self._properties = {
"name": name,
"type": field_type,
}
if mode is not None:
self._properties["mode"] = mode.upper()
if description is not _DEFAULT_VALUE:
self._properties["description"] = description
if precision is not _DEFAULT_VALUE:
self._properties["precision"] = precision
if scale is not _DEFAULT_VALUE:
self._properties["scale"] = scale
if max_length is not _DEFAULT_VALUE:
self._properties["maxLength"] = max_length
self._fields = tuple(fields)
self._policy_tags = self._determine_policy_tags(field_type, policy_tags)
@staticmethod
def _determine_policy_tags(
field_type: str, given_policy_tags: Optional["PolicyTagList"]
) -> Optional["PolicyTagList"]:
"""Return the given policy tags, or their suitable representation if `None`.
Args:
field_type: The type of the schema field.
given_policy_tags: The policy tags to maybe ajdust.
"""
if given_policy_tags is not None:
return given_policy_tags
if field_type is not None and field_type.upper() in _STRUCT_TYPES:
return None
return PolicyTagList()
@staticmethod
def __get_int(api_repr, name):
v = api_repr.get(name, _DEFAULT_VALUE)
if v is not _DEFAULT_VALUE:
v = int(v)
return v
@classmethod
def from_api_repr(cls, api_repr: dict) -> "SchemaField":
"""Return a ``SchemaField`` object deserialized from a dictionary.
Args:
api_repr (Mapping[str, str]): The serialized representation
of the SchemaField, such as what is output by
:meth:`to_api_repr`.
Returns:
google.cloud.biquery.schema.SchemaField: The ``SchemaField`` object.
"""
field_type = api_repr["type"].upper()
# Handle optional properties with default values
mode = api_repr.get("mode", "NULLABLE")
description = api_repr.get("description", _DEFAULT_VALUE)
fields = api_repr.get("fields", ())
policy_tags = cls._determine_policy_tags(
field_type, PolicyTagList.from_api_repr(api_repr.get("policyTags"))
)
return cls(
field_type=field_type,
fields=[cls.from_api_repr(f) for f in fields],
mode=mode.upper(),
description=description,
name=api_repr["name"],
policy_tags=policy_tags,
precision=cls.__get_int(api_repr, "precision"),
scale=cls.__get_int(api_repr, "scale"),
max_length=cls.__get_int(api_repr, "maxLength"),
)
@property
def name(self):
"""str: The name of the field."""
return self._properties["name"]
@property
def field_type(self):
"""str: The type of the field.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.type
"""
return self._properties["type"]
@property
def mode(self):
"""Optional[str]: The mode of the field.
See:
https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#TableFieldSchema.FIELDS.mode
"""
return self._properties.get("mode")
@property
def is_nullable(self):
"""bool: whether 'mode' is 'nullable'."""
return self.mode == "NULLABLE"
@property
def description(self):
"""Optional[str]: description for the field."""
return self._properties.get("description")
@property
def precision(self):
"""Optional[int]: Precision (number of digits) for the NUMERIC field."""
return self._properties.get("precision")
@property
def scale(self):
"""Optional[int]: Scale (digits after decimal) for the NUMERIC field."""
return self._properties.get("scale")
@property
def max_length(self):
"""Optional[int]: Maximum length for the STRING or BYTES field."""
return self._properties.get("maxLength")
@property
def fields(self):
"""Optional[tuple]: Subfields contained in this field.
Must be empty unset if ``field_type`` is not 'RECORD'.
"""
return self._fields
@property
def policy_tags(self):
"""Optional[google.cloud.bigquery.schema.PolicyTagList]: Policy tag list
definition for this field.
"""
return self._policy_tags
def to_api_repr(self) -> dict:
"""Return a dictionary representing this schema field.
Returns:
Dict: A dictionary representing the SchemaField in a serialized form.
"""
answer = self._properties.copy()
# If this is a RECORD type, then sub-fields are also included,
# add this to the serialized representation.
if self.field_type.upper() in _STRUCT_TYPES:
answer["fields"] = [f.to_api_repr() for f in self.fields]
else:
# Explicitly include policy tag definition (we must not do it for RECORD
# fields, because those are not leaf fields).
answer["policyTags"] = self.policy_tags.to_api_repr()
# Done; return the serialized dictionary.
return answer
def _key(self):
"""A tuple key that uniquely describes this field.
Used to compute this instance's hashcode and evaluate equality.
Returns:
Tuple: The contents of this :class:`~google.cloud.bigquery.schema.SchemaField`.
"""
field_type = self.field_type.upper()
if field_type == "STRING" or field_type == "BYTES":
if self.max_length is not None:
field_type = f"{field_type}({self.max_length})"
elif field_type.endswith("NUMERIC"):
if self.precision is not None:
if self.scale is not None:
field_type = f"{field_type}({self.precision}, {self.scale})"
else:
field_type = f"{field_type}({self.precision})"
policy_tags = (
() if self._policy_tags is None else tuple(sorted(self._policy_tags.names))
)
return (
self.name,
field_type,
# Mode is always str, if not given it defaults to a str value
self.mode.upper(), # pytype: disable=attribute-error
self.description,
self._fields,
policy_tags,
)
def to_standard_sql(self) -> types.StandardSqlField:
"""Return the field as the standard SQL field representation object.
Returns:
An instance of :class:`~google.cloud.bigquery_v2.types.StandardSqlField`.
"""
sql_type = types.StandardSqlDataType()
if self.mode == "REPEATED":
sql_type.type_kind = types.StandardSqlDataType.TypeKind.ARRAY
else:
sql_type.type_kind = LEGACY_TO_STANDARD_TYPES.get(
self.field_type,
types.StandardSqlDataType.TypeKind.TYPE_KIND_UNSPECIFIED,
)
if sql_type.type_kind == types.StandardSqlDataType.TypeKind.ARRAY: # noqa: E721
array_element_type = LEGACY_TO_STANDARD_TYPES.get(
self.field_type,
types.StandardSqlDataType.TypeKind.TYPE_KIND_UNSPECIFIED,
)
sql_type.array_element_type.type_kind = array_element_type
# ARRAY cannot directly contain other arrays, only scalar types and STRUCTs
# https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#array-type
if (
array_element_type
== types.StandardSqlDataType.TypeKind.STRUCT # noqa: E721
):
sql_type.array_element_type.struct_type.fields.extend(
field.to_standard_sql() for field in self.fields
)
elif (
sql_type.type_kind
== types.StandardSqlDataType.TypeKind.STRUCT # noqa: E721
):
sql_type.struct_type.fields.extend(
field.to_standard_sql() for field in self.fields
)
return types.StandardSqlField(name=self.name, type=sql_type)
def __eq__(self, other):
if not isinstance(other, SchemaField):
return NotImplemented
return self._key() == other._key()
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self._key())
def __repr__(self):
return "SchemaField{}".format(self._key())
def _parse_schema_resource(info):
"""Parse a resource fragment into a schema field.
Args:
info: (Mapping[str, Dict]): should contain a "fields" key to be parsed
Returns:
Optional[Sequence[google.cloud.bigquery.schema.SchemaField`]:
A list of parsed fields, or ``None`` if no "fields" key found.
"""
return [SchemaField.from_api_repr(f) for f in info.get("fields", ())]
def _build_schema_resource(fields):
"""Generate a resource fragment for a schema.
Args:
fields (Sequence[google.cloud.bigquery.schema.SchemaField): schema to be dumped.
Returns:
Sequence[Dict]: Mappings describing the schema of the supplied fields.
"""
return [field.to_api_repr() for field in fields]
def _to_schema_fields(schema):
"""Coerce `schema` to a list of schema field instances.
Args:
schema(Sequence[Union[ \
:class:`~google.cloud.bigquery.schema.SchemaField`, \
Mapping[str, Any] \
]]):
Table schema to convert. If some items are passed as mappings,
their content must be compatible with
:meth:`~google.cloud.bigquery.schema.SchemaField.from_api_repr`.
Returns:
Sequence[:class:`~google.cloud.bigquery.schema.SchemaField`]
Raises:
Exception: If ``schema`` is not a sequence, or if any item in the
sequence is not a :class:`~google.cloud.bigquery.schema.SchemaField`
instance or a compatible mapping representation of the field.
"""
for field in schema:
if not isinstance(field, (SchemaField, collections.abc.Mapping)):
raise ValueError(
"Schema items must either be fields or compatible "
"mapping representations."
)
return [
field if isinstance(field, SchemaField) else SchemaField.from_api_repr(field)
for field in schema
]
class PolicyTagList(object):
"""Define Policy Tags for a column.
Args:
names (
Optional[Tuple[str]]): list of policy tags to associate with
the column. Policy tag identifiers are of the form
`projects/*/locations/*/taxonomies/*/policyTags/*`.
"""
def __init__(self, names=()):
self._properties = {}
self._properties["names"] = tuple(names)
@property
def names(self):
"""Tuple[str]: Policy tags associated with this definition.
"""
return self._properties.get("names", ())
def _key(self):
"""A tuple key that uniquely describes this PolicyTagList.
Used to compute this instance's hashcode and evaluate equality.
Returns:
Tuple: The contents of this :class:`~google.cloud.bigquery.schema.PolicyTagList`.
"""
return tuple(sorted(self._properties.items()))
def __eq__(self, other):
if not isinstance(other, PolicyTagList):
return NotImplemented
return self._key() == other._key()
def __ne__(self, other):
return not self == other
def __hash__(self):
return hash(self._key())
def __repr__(self):
return "PolicyTagList{}".format(self._key())
@classmethod
def from_api_repr(cls, api_repr: dict) -> "PolicyTagList":
"""Return a :class:`PolicyTagList` object deserialized from a dict.
This method creates a new ``PolicyTagList`` instance that points to
the ``api_repr`` parameter as its internal properties dict. This means
that when a ``PolicyTagList`` instance is stored as a property of
another object, any changes made at the higher level will also appear
here.
Args:
api_repr (Mapping[str, str]):
The serialized representation of the PolicyTagList, such as
what is output by :meth:`to_api_repr`.
Returns:
Optional[google.cloud.bigquery.schema.PolicyTagList]:
The ``PolicyTagList`` object or None.
"""
if api_repr is None:
return None
names = api_repr.get("names", ())
return cls(names=names)
def to_api_repr(self) -> dict:
"""Return a dictionary representing this object.
This method returns the properties dict of the ``PolicyTagList``
instance rather than making a copy. This means that when a
``PolicyTagList`` instance is stored as a property of another
object, any changes made at the higher level will also appear here.
Returns:
dict:
A dictionary representing the PolicyTagList object in
serialized form.
"""
answer = {"names": [name for name in self.names]}
return answer