/
cursor.py
479 lines (381 loc) · 16.5 KB
/
cursor.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
# Copyright 2017 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Cursor for the Google BigQuery DB-API."""
import collections
import copy
try:
from collections import abc as collections_abc
except ImportError: # Python 2.7
import collections as collections_abc
import logging
import six
from google.cloud.bigquery import job
from google.cloud.bigquery import schema
from google.cloud.bigquery import table
from google.cloud.bigquery.dbapi import _helpers
from google.cloud.bigquery.dbapi import exceptions
import google.cloud.exceptions
_LOGGER = logging.getLogger(__name__)
# Per PEP 249: A 7-item sequence containing information describing one result
# column. The first two items (name and type_code) are mandatory, the other
# five are optional and are set to None if no meaningful values can be
# provided.
Column = collections.namedtuple(
"Column",
[
"name",
"type_code",
"display_size",
"internal_size",
"precision",
"scale",
"null_ok",
],
)
class Cursor(object):
"""DB-API Cursor to Google BigQuery.
Args:
connection (google.cloud.bigquery.dbapi.Connection):
A DB-API connection to Google BigQuery.
"""
def __init__(self, connection):
self.connection = connection
self.description = None
# Per PEP 249: The attribute is -1 in case no .execute*() has been
# performed on the cursor or the rowcount of the last operation
# cannot be determined by the interface.
self.rowcount = -1
# Per PEP 249: The arraysize attribute defaults to 1, meaning to fetch
# a single row at a time. However, we deviate from that, and set the
# default to None, allowing the backend to automatically determine the
# most appropriate size.
self.arraysize = None
self._query_data = None
self._query_job = None
def close(self):
"""No-op."""
def _set_description(self, schema):
"""Set description from schema.
Args:
schema (Sequence[google.cloud.bigquery.schema.SchemaField]):
A description of fields in the schema.
"""
if schema is None:
self.description = None
return
self.description = tuple(
Column(
name=field.name,
type_code=field.field_type,
display_size=None,
internal_size=None,
precision=None,
scale=None,
null_ok=field.is_nullable,
)
for field in schema
)
def _set_rowcount(self, query_results):
"""Set the rowcount from query results.
Normally, this sets rowcount to the number of rows returned by the
query, but if it was a DML statement, it sets rowcount to the number
of modified rows.
Args:
query_results (google.cloud.bigquery.query._QueryResults):
Results of a query.
"""
total_rows = 0
num_dml_affected_rows = query_results.num_dml_affected_rows
if query_results.total_rows is not None and query_results.total_rows > 0:
total_rows = query_results.total_rows
if num_dml_affected_rows is not None and num_dml_affected_rows > 0:
total_rows = num_dml_affected_rows
self.rowcount = total_rows
def execute(self, operation, parameters=None, job_id=None, job_config=None):
"""Prepare and execute a database operation.
.. note::
When setting query parameters, values which are "text"
(``unicode`` in Python2, ``str`` in Python3) will use
the 'STRING' BigQuery type. Values which are "bytes" (``str`` in
Python2, ``bytes`` in Python3), will use using the 'BYTES' type.
A `~datetime.datetime` parameter without timezone information uses
the 'DATETIME' BigQuery type (example: Global Pi Day Celebration
March 14, 2017 at 1:59pm). A `~datetime.datetime` parameter with
timezone information uses the 'TIMESTAMP' BigQuery type (example:
a wedding on April 29, 2011 at 11am, British Summer Time).
For more information about BigQuery data types, see:
https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types
``STRUCT``/``RECORD`` and ``REPEATED`` query parameters are not
yet supported. See:
https://github.com/GoogleCloudPlatform/google-cloud-python/issues/3524
Args:
operation (str): A Google BigQuery query string.
parameters (Union[Mapping[str, Any], Sequence[Any]]):
(Optional) dictionary or sequence of parameter values.
job_id (str):
(Optional) The job_id to use. If not set, a job ID
is generated at random.
job_config (google.cloud.bigquery.job.QueryJobConfig):
(Optional) Extra configuration options for the query job.
"""
self._query_data = None
self._query_job = None
client = self.connection._client
# The DB-API uses the pyformat formatting, since the way BigQuery does
# query parameters was not one of the standard options. Convert both
# the query and the parameters to the format expected by the client
# libraries.
formatted_operation = _format_operation(operation, parameters=parameters)
query_parameters = _helpers.to_query_parameters(parameters)
if client._default_query_job_config:
if job_config:
config = job_config._fill_from_default(client._default_query_job_config)
else:
config = copy.deepcopy(client._default_query_job_config)
else:
config = job_config or job.QueryJobConfig(use_legacy_sql=False)
config.query_parameters = query_parameters
self._query_job = client.query(
formatted_operation, job_config=config, job_id=job_id
)
if self._query_job.dry_run:
schema_field = schema.SchemaField(
name="estimated_bytes", field_type="INTEGER", mode="REQUIRED",
)
self._set_description(schema=[schema_field])
self.rowcount = 1
return
# Wait for the query to finish.
try:
self._query_job.result()
except google.cloud.exceptions.GoogleCloudError as exc:
raise exceptions.DatabaseError(exc)
query_results = self._query_job._query_results
self._set_rowcount(query_results)
self._set_description(query_results.schema)
def executemany(self, operation, seq_of_parameters):
"""Prepare and execute a database operation multiple times.
Args:
operation (str): A Google BigQuery query string.
seq_of_parameters (Union[Sequence[Mapping[str, Any], Sequence[Any]]]):
Sequence of many sets of parameter values.
"""
for parameters in seq_of_parameters:
self.execute(operation, parameters)
def _try_fetch(self, size=None):
"""Try to start fetching data, if not yet started.
Mutates self to indicate that iteration has started.
"""
if self._query_job is None:
raise exceptions.InterfaceError(
"No query results: execute() must be called before fetch."
)
if self._query_job.dry_run:
estimated_bytes = self._query_job.total_bytes_processed
row = table.Row((estimated_bytes,), {"estimated_bytes": 0})
self._query_data = iter([row])
return
is_dml = (
self._query_job.statement_type
and self._query_job.statement_type.upper() != "SELECT"
)
if is_dml:
self._query_data = iter([])
return
if self._query_data is None:
client = self.connection._client
bqstorage_client = self.connection._bqstorage_client
if bqstorage_client is not None:
try:
rows_iterable = self._bqstorage_fetch(bqstorage_client)
self._query_data = _helpers.to_bq_table_rows(rows_iterable)
return
except google.api_core.exceptions.GoogleAPICallError as exc:
# NOTE: Forbidden is a subclass of GoogleAPICallError
if isinstance(exc, google.api_core.exceptions.Forbidden):
# Don't hide errors such as insufficient permissions to create
# a read session, or the API is not enabled. Both of those are
# clearly problems if the developer has explicitly asked for
# BigQuery Storage API support.
raise
# There is an issue with reading from small anonymous
# query results tables. If such an error occurs, we silence
# it in order to try again with the tabledata.list API.
_LOGGER.debug(
"Error fetching data with BigQuery Storage API, "
"falling back to tabledata.list API."
)
rows_iter = client.list_rows(
self._query_job.destination,
selected_fields=self._query_job._query_results.schema,
page_size=self.arraysize,
)
self._query_data = iter(rows_iter)
def _bqstorage_fetch(self, bqstorage_client):
"""Start fetching data with the BigQuery Storage API.
The method assumes that the data about the relevant query job already
exists internally.
Args:
bqstorage_client(\
google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \
):
A client tha know how to talk to the BigQuery Storage API.
Returns:
Iterable[Mapping]:
A sequence of rows, represented as dictionaries.
"""
# NOTE: Given that BQ storage client instance is passed in, it means
# that bigquery_storage_v1beta1 library is available (no ImportError).
from google.cloud import bigquery_storage_v1beta1
table_reference = self._query_job.destination
read_session = bqstorage_client.create_read_session(
table_reference.to_bqstorage(),
"projects/{}".format(table_reference.project),
# a single stream only, as DB API is not well-suited for multithreading
requested_streams=1,
)
if not read_session.streams:
return iter([]) # empty table, nothing to read
read_position = bigquery_storage_v1beta1.types.StreamPosition(
stream=read_session.streams[0],
)
read_rows_stream = bqstorage_client.read_rows(read_position)
rows_iterable = read_rows_stream.rows(read_session)
return rows_iterable
def fetchone(self):
"""Fetch a single row from the results of the last ``execute*()`` call.
.. note::
If a dry run query was executed, a row with a single value is
returned representing the estimated number of bytes that would be
processed by the query.
Returns:
Tuple:
A tuple representing a row or ``None`` if no more data is
available.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
self._try_fetch()
try:
return six.next(self._query_data)
except StopIteration:
return None
def fetchmany(self, size=None):
"""Fetch multiple results from the last ``execute*()`` call.
.. note::
If a dry run query was executed, a row with a single value is
returned representing the estimated number of bytes that would be
processed by the query.
.. note::
The size parameter is not used for the request/response size.
Set the ``arraysize`` attribute before calling ``execute()`` to
set the batch size.
Args:
size (int):
(Optional) Maximum number of rows to return. Defaults to the
``arraysize`` property value. If ``arraysize`` is not set, it
defaults to ``1``.
Returns:
List[Tuple]: A list of rows.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
if size is None:
# Since self.arraysize can be None (a deviation from PEP 249),
# use an actual PEP 249 default of 1 in such case (*some* number
# is needed here).
size = self.arraysize if self.arraysize else 1
self._try_fetch(size=size)
rows = []
for row in self._query_data:
rows.append(row)
if len(rows) >= size:
break
return rows
def fetchall(self):
"""Fetch all remaining results from the last ``execute*()`` call.
.. note::
If a dry run query was executed, a row with a single value is
returned representing the estimated number of bytes that would be
processed by the query.
Returns:
List[Tuple]: A list of all the rows in the results.
Raises:
google.cloud.bigquery.dbapi.InterfaceError: if called before ``execute()``.
"""
self._try_fetch()
return list(self._query_data)
def setinputsizes(self, sizes):
"""No-op."""
def setoutputsize(self, size, column=None):
"""No-op."""
def _format_operation_list(operation, parameters):
"""Formats parameters in operation in the way BigQuery expects.
The input operation will be a query like ``SELECT %s`` and the output
will be a query like ``SELECT ?``.
Args:
operation (str): A Google BigQuery query string.
parameters (Sequence[Any]): Sequence of parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
formatted_params = ["?" for _ in parameters]
try:
return operation % tuple(formatted_params)
except TypeError as exc:
raise exceptions.ProgrammingError(exc)
def _format_operation_dict(operation, parameters):
"""Formats parameters in operation in the way BigQuery expects.
The input operation will be a query like ``SELECT %(namedparam)s`` and
the output will be a query like ``SELECT @namedparam``.
Args:
operation (str): A Google BigQuery query string.
parameters (Mapping[str, Any]): Dictionary of parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
formatted_params = {}
for name in parameters:
escaped_name = name.replace("`", r"\`")
formatted_params[name] = "@`{}`".format(escaped_name)
try:
return operation % formatted_params
except KeyError as exc:
raise exceptions.ProgrammingError(exc)
def _format_operation(operation, parameters=None):
"""Formats parameters in operation in way BigQuery expects.
Args:
operation (str): A Google BigQuery query string.
parameters (Union[Mapping[str, Any], Sequence[Any]]):
Optional parameter values.
Returns:
str: A formatted query string.
Raises:
google.cloud.bigquery.dbapi.ProgrammingError:
if a parameter used in the operation is not found in the
``parameters`` argument.
"""
if parameters is None:
return operation
if isinstance(parameters, collections_abc.Mapping):
return _format_operation_dict(operation, parameters)
return _format_operation_list(operation, parameters)