Skip to content

Commit

Permalink
feat: Support passing struct data to the DB API (#718)
Browse files Browse the repository at this point in the history
  • Loading branch information
jimfulton committed Jul 1, 2021
1 parent e99abbb commit 38b3ef9
Show file tree
Hide file tree
Showing 8 changed files with 597 additions and 51 deletions.
11 changes: 9 additions & 2 deletions docs/dbapi.rst
Expand Up @@ -25,7 +25,7 @@ and using named parameters::
Providing explicit type information
-----------------------------------

BigQuery requires type information for parameters. The The BigQuery
BigQuery requires type information for parameters. The BigQuery
DB-API can usually determine parameter types for parameters based on
provided values. Sometimes, however, types can't be determined (for
example when `None` is passed) or are determined incorrectly (for
Expand All @@ -37,7 +37,14 @@ colon, as in::

insert into people (name, income) values (%(name:string)s, %(income:numeric)s)

For unnamed parameters, use the named syntax with a type, but now
For unnamed parameters, use the named syntax with a type, but no
name, as in::

insert into people (name, income) values (%(:string)s, %(:numeric)s)

Providing type information is the *only* way to pass `struct` data::

cursor.execute(
"insert into points (point) values (%(:struct<x float64, y float64>)s)",
[{"x": 10, "y": 20}],
)
252 changes: 216 additions & 36 deletions google/cloud/bigquery/dbapi/_helpers.py
Expand Up @@ -18,18 +18,34 @@
import decimal
import functools
import numbers
import re
import typing

from google.cloud import bigquery
from google.cloud.bigquery import table, enums
from google.cloud.bigquery import table, enums, query
from google.cloud.bigquery.dbapi import exceptions


_NUMERIC_SERVER_MIN = decimal.Decimal("-9.9999999999999999999999999999999999999E+28")
_NUMERIC_SERVER_MAX = decimal.Decimal("9.9999999999999999999999999999999999999E+28")

type_parameters_re = re.compile(
r"""
\(
\s*[0-9]+\s*
(,
\s*[0-9]+\s*
)*
\)
""",
re.VERBOSE,
)


def _parameter_type(name, value, query_parameter_type=None, value_doc=""):
if query_parameter_type:
# Strip type parameters
query_parameter_type = type_parameters_re.sub("", query_parameter_type)
try:
parameter_type = getattr(
enums.SqlParameterScalarTypes, query_parameter_type.upper()
Expand Down Expand Up @@ -113,6 +129,197 @@ def array_to_query_parameter(value, name=None, query_parameter_type=None):
return bigquery.ArrayQueryParameter(name, array_type, value)


def _parse_struct_fields(
fields,
base,
parse_struct_field=re.compile(
r"""
(?:(\w+)\s+) # field name
([A-Z0-9<> ,()]+) # Field type
$""",
re.VERBOSE | re.IGNORECASE,
).match,
):
# Split a string of struct fields. They're defined by commas, but
# we have to avoid splitting on commas internal to fields. For
# example:
# name string, children array<struct<name string, bdate date>>
#
# only has 2 top-level fields.
fields = fields.split(",")
fields = list(reversed(fields)) # in the off chance that there are very many
while fields:
field = fields.pop()
while fields and field.count("<") != field.count(">"):
field += "," + fields.pop()

m = parse_struct_field(field.strip())
if not m:
raise exceptions.ProgrammingError(
f"Invalid struct field, {field}, in {base}"
)
yield m.group(1, 2)


SCALAR, ARRAY, STRUCT = "sar"


def _parse_type(
type_,
name,
base,
complex_query_parameter_parse=re.compile(
r"""
\s*
(ARRAY|STRUCT|RECORD) # Type
\s*
<([A-Z0-9<> ,()]+)> # Subtype(s)
\s*$
""",
re.IGNORECASE | re.VERBOSE,
).match,
):
if "<" not in type_:
# Scalar

# Strip type parameters
type_ = type_parameters_re.sub("", type_).strip()
try:
type_ = getattr(enums.SqlParameterScalarTypes, type_.upper())
except AttributeError:
raise exceptions.ProgrammingError(
f"The given parameter type, {type_},"
f"{' for ' + name if name else ''}"
f" is not a valid BigQuery scalar type, in {base}."
)
if name:
type_ = type_.with_name(name)
return SCALAR, type_

m = complex_query_parameter_parse(type_)
if not m:
raise exceptions.ProgrammingError(f"Invalid parameter type, {type_}")
tname, sub = m.group(1, 2)
if tname.upper() == "ARRAY":
sub_type = complex_query_parameter_type(None, sub, base)
if isinstance(sub_type, query.ArrayQueryParameterType):
raise exceptions.ProgrammingError(f"Array can't contain an array in {base}")
sub_type._complex__src = sub
return ARRAY, sub_type
else:
return STRUCT, _parse_struct_fields(sub, base)


def complex_query_parameter_type(name: typing.Optional[str], type_: str, base: str):
"""Construct a parameter type (`StructQueryParameterType`) for a complex type
or a non-complex type that's part of a complex type.
Examples:
array<struct<x float64, y float64>>
struct<name string, children array<struct<name string, bdate date>>>
This is used for computing array types.
"""

type_type, sub_type = _parse_type(type_, name, base)
if type_type == SCALAR:
type_ = sub_type
elif type_type == ARRAY:
type_ = query.ArrayQueryParameterType(sub_type, name=name)
elif type_type == STRUCT:
fields = [
complex_query_parameter_type(field_name, field_type, base)
for field_name, field_type in sub_type
]
type_ = query.StructQueryParameterType(*fields, name=name)
else: # pragma: NO COVER
raise AssertionError("Bad type_type", type_type) # Can't happen :)

return type_


def complex_query_parameter(
name: typing.Optional[str], value, type_: str, base: typing.Optional[str] = None
):
"""
Construct a query parameter for a complex type (array or struct record)
or for a subtype, which may not be complex
Examples:
array<struct<x float64, y float64>>
struct<name string, children array<struct<name string, bdate date>>>
"""
base = base or type_

type_type, sub_type = _parse_type(type_, name, base)

if type_type == SCALAR:
param = query.ScalarQueryParameter(name, sub_type._type, value)
elif type_type == ARRAY:
if not array_like(value):
raise exceptions.ProgrammingError(
f"Array type with non-array-like value"
f" with type {type(value).__name__}"
)
param = query.ArrayQueryParameter(
name,
sub_type,
value
if isinstance(sub_type, query.ScalarQueryParameterType)
else [
complex_query_parameter(None, v, sub_type._complex__src, base)
for v in value
],
)
elif type_type == STRUCT:
if not isinstance(value, collections_abc.Mapping):
raise exceptions.ProgrammingError(f"Non-mapping value for type {type_}")
value_keys = set(value)
fields = []
for field_name, field_type in sub_type:
if field_name not in value:
raise exceptions.ProgrammingError(
f"No field value for {field_name} in {type_}"
)
value_keys.remove(field_name)
fields.append(
complex_query_parameter(field_name, value[field_name], field_type, base)
)
if value_keys:
raise exceptions.ProgrammingError(f"Extra data keys for {type_}")

param = query.StructQueryParameter(name, *fields)
else: # pragma: NO COVER
raise AssertionError("Bad type_type", type_type) # Can't happen :)

return param


def _dispatch_parameter(type_, value, name=None):
if type_ is not None and "<" in type_:
param = complex_query_parameter(name, value, type_)
elif isinstance(value, collections_abc.Mapping):
raise NotImplementedError(
f"STRUCT-like parameter values are not supported"
f"{' (parameter ' + name + ')' if name else ''},"
f" unless an explicit type is give in the parameter placeholder"
f" (e.g. '%({name if name else ''}:struct<...>)s')."
)
elif array_like(value):
param = array_to_query_parameter(value, name, type_)
else:
param = scalar_to_query_parameter(value, name, type_)

return param


def to_query_parameters_list(parameters, parameter_types):
"""Converts a sequence of parameter values into query parameters.
Expand All @@ -126,19 +333,10 @@ def to_query_parameters_list(parameters, parameter_types):
List[google.cloud.bigquery.query._AbstractQueryParameter]:
A list of query parameters.
"""
result = []

for value, type_ in zip(parameters, parameter_types):
if isinstance(value, collections_abc.Mapping):
raise NotImplementedError("STRUCT-like parameter values are not supported.")
elif array_like(value):
param = array_to_query_parameter(value, None, type_)
else:
param = scalar_to_query_parameter(value, None, type_)

result.append(param)

return result
return [
_dispatch_parameter(type_, value)
for value, type_ in zip(parameters, parameter_types)
]


def to_query_parameters_dict(parameters, query_parameter_types):
Expand All @@ -154,28 +352,10 @@ def to_query_parameters_dict(parameters, query_parameter_types):
List[google.cloud.bigquery.query._AbstractQueryParameter]:
A list of named query parameters.
"""
result = []

for name, value in parameters.items():
if isinstance(value, collections_abc.Mapping):
raise NotImplementedError(
"STRUCT-like parameter values are not supported "
"(parameter {}).".format(name)
)
else:
query_parameter_type = query_parameter_types.get(name)
if array_like(value):
param = array_to_query_parameter(
value, name=name, query_parameter_type=query_parameter_type
)
else:
param = scalar_to_query_parameter(
value, name=name, query_parameter_type=query_parameter_type,
)

result.append(param)

return result
return [
_dispatch_parameter(query_parameter_types.get(name), value, name)
for name, value in parameters.items()
]


def to_query_parameters(parameters, parameter_types):
Expand Down
28 changes: 27 additions & 1 deletion google/cloud/bigquery/dbapi/cursor.py
Expand Up @@ -483,7 +483,33 @@ def _format_operation(operation, parameters):


def _extract_types(
operation, extra_type_sub=re.compile(r"(%*)%(?:\(([^:)]*)(?::(\w+))?\))?s").sub
operation,
extra_type_sub=re.compile(
r"""
(%*) # Extra %s. We'll deal with these in the replacement code
% # Beginning of replacement, %s, %(...)s
(?:\( # Begin of optional name and/or type
([^:)]*) # name
(?:: # ':' introduces type
( # start of type group
[a-zA-Z0-9<>, ]+ # First part, no parens
(?: # start sets of parens + non-paren text
\([0-9 ,]+\) # comma-separated groups of digits in parens
# (e.g. string(10))
(?=[, >)]) # Must be followed by ,>) or space
[a-zA-Z0-9<>, ]* # Optional non-paren chars
)* # Can be zero or more of parens and following text
) # end of type group
)? # close type clause ":type"
\))? # End of optional name and/or type
s # End of replacement
""",
re.VERBOSE,
).sub,
):
"""Remove type information from parameter placeholders.
Expand Down
7 changes: 6 additions & 1 deletion tests/system/conftest.py
Expand Up @@ -31,9 +31,14 @@ def bqstorage_client(bigquery_client):
return bigquery_storage.BigQueryReadClient(credentials=bigquery_client._credentials)


@pytest.fixture
@pytest.fixture(scope="session")
def dataset_id(bigquery_client):
dataset_id = f"bqsystem_{helpers.temp_suffix()}"
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture
def table_id(dataset_id):
return f"{dataset_id}.table_{helpers.temp_suffix()}"

0 comments on commit 38b3ef9

Please sign in to comment.