From ba9b2f87e36320d80f6f6460b77e6daddb0fa214 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Tue, 25 Feb 2020 18:47:00 +0000 Subject: [PATCH] feat: add BigQuery storage client support to DB API (#36) * feat: add BigQueryStorageClient support to DB API * Use BigQuery Storage client in Cursor if available * Skip BQ storage unit tests in Python 3.8 * Add system tests for Cursor w/ BQ storage client * Add test for Connection ctor w/o BQ storage client * Refactor exception handling in Cursor._try_fetch() * Add explicit check against None Co-Authored-By: Tres Seaver * Remove redundand word in a comment in cursor.py Co-authored-by: Tres Seaver --- google/cloud/bigquery/dbapi/_helpers.py | 20 +++ google/cloud/bigquery/dbapi/connection.py | 39 ++++- google/cloud/bigquery/dbapi/cursor.py | 68 ++++++++ noxfile.py | 5 +- tests/system.py | 100 ++++++++++++ tests/unit/test_dbapi__helpers.py | 34 ++++ tests/unit/test_dbapi_connection.py | 49 +++++- tests/unit/test_dbapi_cursor.py | 182 ++++++++++++++++++++++ 8 files changed, 487 insertions(+), 10 deletions(-) diff --git a/google/cloud/bigquery/dbapi/_helpers.py b/google/cloud/bigquery/dbapi/_helpers.py index 651880fea..6558177d7 100644 --- a/google/cloud/bigquery/dbapi/_helpers.py +++ b/google/cloud/bigquery/dbapi/_helpers.py @@ -24,6 +24,7 @@ import six from google.cloud import bigquery +from google.cloud.bigquery import table from google.cloud.bigquery.dbapi import exceptions @@ -218,3 +219,22 @@ def array_like(value): return isinstance(value, collections_abc.Sequence) and not isinstance( value, (six.text_type, six.binary_type, bytearray) ) + + +def to_bq_table_rows(rows_iterable): + """Convert table rows to BigQuery table Row instances. + + Args: + rows_iterable (Iterable[Mapping]): + An iterable of row data items to convert to ``Row`` instances. + + Returns: + Iterable[google.cloud.bigquery.table.Row] + """ + + def to_table_row(row): + values = tuple(row.values()) + keys_to_index = {key: i for i, key in enumerate(row.keys())} + return table.Row(values, keys_to_index) + + return (to_table_row(row_data) for row_data in rows_iterable) diff --git a/google/cloud/bigquery/dbapi/connection.py b/google/cloud/bigquery/dbapi/connection.py index ee7d0dc3c..b8eaf2f9b 100644 --- a/google/cloud/bigquery/dbapi/connection.py +++ b/google/cloud/bigquery/dbapi/connection.py @@ -23,10 +23,24 @@ class Connection(object): Args: client (google.cloud.bigquery.Client): A client used to connect to BigQuery. + bqstorage_client(\ + Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + ): + [Beta] An alternative client that uses the faster BigQuery Storage + API to fetch rows from BigQuery. If both clients are given, + ``bqstorage_client`` is used first to fetch query results, + with a fallback on ``client``, if necessary. + + .. note:: + There is a known issue with the BigQuery Storage API with small + anonymous result sets, which results in such fallback. + + https://github.com/googleapis/python-bigquery-storage/issues/2 """ - def __init__(self, client): + def __init__(self, client, bqstorage_client=None): self._client = client + self._bqstorage_client = bqstorage_client def close(self): """No-op.""" @@ -43,17 +57,30 @@ def cursor(self): return cursor.Cursor(self) -def connect(client=None): +def connect(client=None, bqstorage_client=None): """Construct a DB-API connection to Google BigQuery. Args: - client (google.cloud.bigquery.Client): - (Optional) A client used to connect to BigQuery. If not passed, a - client is created using default options inferred from the environment. + client (Optional[google.cloud.bigquery.Client]): + A client used to connect to BigQuery. If not passed, a client is + created using default options inferred from the environment. + bqstorage_client(\ + Optional[google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient] \ + ): + [Beta] An alternative client that uses the faster BigQuery Storage + API to fetch rows from BigQuery. If both clients are given, + ``bqstorage_client`` is used first to fetch query results, + with a fallback on ``client``, if necessary. + + .. note:: + There is a known issue with the BigQuery Storage API with small + anonymous result sets, which results in such fallback. + + https://github.com/googleapis/python-bigquery-storage/issues/2 Returns: google.cloud.bigquery.dbapi.Connection: A new DB-API connection to BigQuery. """ if client is None: client = bigquery.Client() - return Connection(client) + return Connection(client, bqstorage_client) diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index a3e6ea5be..eb73b3d56 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -21,6 +21,8 @@ except ImportError: # Python 2.7 import collections as collections_abc +import logging + import six from google.cloud.bigquery import job @@ -28,6 +30,9 @@ from google.cloud.bigquery.dbapi import exceptions import google.cloud.exceptions + +_LOGGER = logging.getLogger(__name__) + # Per PEP 249: A 7-item sequence containing information describing one result # column. The first two items (name and type_code) are mandatory, the other # five are optional and are set to None if no meaningful values can be @@ -212,6 +217,30 @@ def _try_fetch(self, size=None): if self._query_data is None: client = self.connection._client + bqstorage_client = self.connection._bqstorage_client + + if bqstorage_client is not None: + try: + rows_iterable = self._bqstorage_fetch(bqstorage_client) + self._query_data = _helpers.to_bq_table_rows(rows_iterable) + return + except google.api_core.exceptions.GoogleAPICallError as exc: + # NOTE: Forbidden is a subclass of GoogleAPICallError + if isinstance(exc, google.api_core.exceptions.Forbidden): + # Don't hide errors such as insufficient permissions to create + # a read session, or the API is not enabled. Both of those are + # clearly problems if the developer has explicitly asked for + # BigQuery Storage API support. + raise + + # There is an issue with reading from small anonymous + # query results tables. If such an error occurs, we silence + # it in order to try again with the tabledata.list API. + _LOGGER.debug( + "Error fetching data with BigQuery Storage API, " + "falling back to tabledata.list API." + ) + rows_iter = client.list_rows( self._query_job.destination, selected_fields=self._query_job._query_results.schema, @@ -219,6 +248,45 @@ def _try_fetch(self, size=None): ) self._query_data = iter(rows_iter) + def _bqstorage_fetch(self, bqstorage_client): + """Start fetching data with the BigQuery Storage API. + + The method assumes that the data about the relevant query job already + exists internally. + + Args: + bqstorage_client(\ + google.cloud.bigquery_storage_v1beta1.BigQueryStorageClient \ + ): + A client tha know how to talk to the BigQuery Storage API. + + Returns: + Iterable[Mapping]: + A sequence of rows, represented as dictionaries. + """ + # NOTE: Given that BQ storage client instance is passed in, it means + # that bigquery_storage_v1beta1 library is available (no ImportError). + from google.cloud import bigquery_storage_v1beta1 + + table_reference = self._query_job.destination + + read_session = bqstorage_client.create_read_session( + table_reference.to_bqstorage(), + "projects/{}".format(table_reference.project), + # a single stream only, as DB API is not well-suited for multithreading + requested_streams=1, + ) + + if not read_session.streams: + return iter([]) # empty table, nothing to read + + read_position = bigquery_storage_v1beta1.types.StreamPosition( + stream=read_session.streams[0], + ) + read_rows_stream = bqstorage_client.read_rows(read_position) + rows_iterable = read_rows_stream.rows(read_session) + return rows_iterable + def fetchone(self): """Fetch a single row from the results of the last ``execute*()`` call. diff --git a/noxfile.py b/noxfile.py index f7e59e560..32782d0a0 100644 --- a/noxfile.py +++ b/noxfile.py @@ -48,7 +48,7 @@ def default(session): # Since many tests are skipped due to missing dependencies, test # coverage is much lower in Python 3.8. Remove once we can test with # pyarrow. - coverage_fail_under = "--cov-fail-under=92" + coverage_fail_under = "--cov-fail-under=91" dev_install = ".[pandas,tqdm]" session.install("-e", dev_install) @@ -70,7 +70,7 @@ def default(session): "--cov-report=", coverage_fail_under, os.path.join("tests", "unit"), - *session.posargs + *session.posargs, ) @@ -94,6 +94,7 @@ def system(session): # Install all test dependencies, then install local packages in place. session.install("mock", "pytest", "psutil") session.install("google-cloud-storage") + session.install("fastavro") session.install("-e", "test_utils") session.install("-e", ".[all]") diff --git a/tests/system.py b/tests/system.py index 4a1c03271..c611d8e7e 100644 --- a/tests/system.py +++ b/tests/system.py @@ -36,6 +36,12 @@ from google.cloud import bigquery_storage_v1beta1 except ImportError: # pragma: NO COVER bigquery_storage_v1beta1 = None + +try: + import fastavro # to parse BQ storage client results +except ImportError: # pragma: NO COVER + fastavro = None + try: import pandas except ImportError: # pragma: NO COVER @@ -1543,6 +1549,100 @@ def test_dbapi_fetchall(self): row_tuples = [r.values() for r in rows] self.assertEqual(row_tuples, [(1, 2), (3, 4), (5, 6)]) + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_dbapi_fetch_w_bqstorage_client_small_result_set(self): + bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=Config.CLIENT._credentials + ) + cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor() + + # Reading small result sets causes an issue with BQ storage client, + # and the DB API should transparently fall back to the default client. + cursor.execute( + """ + SELECT id, `by`, time_ts + FROM `bigquery-public-data.hacker_news.comments` + ORDER BY `id` ASC + LIMIT 10 + """ + ) + + result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()] + + field_name = operator.itemgetter(0) + fetched_data = [sorted(row.items(), key=field_name) for row in result_rows] + + expected_data = [ + [ + ("by", "sama"), + ("id", 15), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 17), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 22), + ("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)), + ], + ] + self.assertEqual(fetched_data, expected_data) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + @unittest.skipIf(fastavro is None, "Requires `fastavro`") + def test_dbapi_fetch_w_bqstorage_client_large_result_set(self): + bqstorage_client = bigquery_storage_v1beta1.BigQueryStorageClient( + credentials=Config.CLIENT._credentials + ) + cursor = dbapi.connect(Config.CLIENT, bqstorage_client).cursor() + + # Pick a large enouhg LIMIT value to assure that the fallback to the + # default client is not needed due to the result set being too small + # (a known issue that causes problems when reding such result sets with + # BQ storage client). + cursor.execute( + """ + SELECT id, `by`, time_ts + FROM `bigquery-public-data.hacker_news.comments` + ORDER BY `id` ASC + LIMIT 100000 + """ + ) + + result_rows = [cursor.fetchone(), cursor.fetchone(), cursor.fetchone()] + + field_name = operator.itemgetter(0) + fetched_data = [sorted(row.items(), key=field_name) for row in result_rows] + + # Since DB API is not thread safe, only a single result stream should be + # requested by the BQ storage client, meaning that results should arrive + # in the sorted order. + expected_data = [ + [ + ("by", "sama"), + ("id", 15), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 51, 1, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 17), + ("time_ts", datetime.datetime(2006, 10, 9, 19, 52, 45, tzinfo=UTC)), + ], + [ + ("by", "pg"), + ("id", 22), + ("time_ts", datetime.datetime(2006, 10, 10, 2, 18, 22, tzinfo=UTC)), + ], + ] + self.assertEqual(fetched_data, expected_data) + def _load_table_for_dml(self, rows, dataset_id, table_id): from google.cloud._testing import _NamedTemporaryFile from google.cloud.bigquery.job import CreateDisposition diff --git a/tests/unit/test_dbapi__helpers.py b/tests/unit/test_dbapi__helpers.py index 45c690ede..8f98d0c53 100644 --- a/tests/unit/test_dbapi__helpers.py +++ b/tests/unit/test_dbapi__helpers.py @@ -15,9 +15,11 @@ import datetime import decimal import math +import operator as op import unittest import google.cloud._helpers +from google.cloud.bigquery import table from google.cloud.bigquery.dbapi import _helpers from google.cloud.bigquery.dbapi import exceptions @@ -185,3 +187,35 @@ def test_to_query_parameters_w_list_dict_param(self): def test_to_query_parameters_none_argument(self): query_parameters = _helpers.to_query_parameters(None) self.assertEqual(query_parameters, []) + + +class TestToBqTableRows(unittest.TestCase): + def test_empty_iterable(self): + rows_iterable = iter([]) + result = _helpers.to_bq_table_rows(rows_iterable) + self.assertEqual(list(result), []) + + def test_non_empty_iterable(self): + rows_iterable = [ + dict(one=1.1, four=1.4, two=1.2, three=1.3), + dict(one=2.1, four=2.4, two=2.2, three=2.3), + ] + + result = _helpers.to_bq_table_rows(rows_iterable) + + rows = list(result) + self.assertEqual(len(rows), 2) + + row_1, row_2 = rows + self.assertIsInstance(row_1, table.Row) + self.assertIsInstance(row_2, table.Row) + + field_value = op.itemgetter(1) + + items = sorted(row_1.items(), key=field_value) + expected_items = [("one", 1.1), ("two", 1.2), ("three", 1.3), ("four", 1.4)] + self.assertEqual(items, expected_items) + + items = sorted(row_2.items(), key=field_value) + expected_items = [("one", 2.1), ("two", 2.2), ("three", 2.3), ("four", 2.4)] + self.assertEqual(items, expected_items) diff --git a/tests/unit/test_dbapi_connection.py b/tests/unit/test_dbapi_connection.py index 19acec05b..595afd0fe 100644 --- a/tests/unit/test_dbapi_connection.py +++ b/tests/unit/test_dbapi_connection.py @@ -16,6 +16,11 @@ import mock +try: + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None + class TestConnection(unittest.TestCase): @staticmethod @@ -27,19 +32,41 @@ def _get_target_class(): def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) - def _mock_client(self, rows=None, schema=None): + def _mock_client(self): from google.cloud.bigquery import client mock_client = mock.create_autospec(client.Client) return mock_client - def test_ctor(self): + def _mock_bqstorage_client(self): + from google.cloud.bigquery_storage_v1beta1 import client + + mock_client = mock.create_autospec(client.BigQueryStorageClient) + return mock_client + + def test_ctor_wo_bqstorage_client(self): from google.cloud.bigquery.dbapi import Connection mock_client = self._mock_client() connection = self._make_one(client=mock_client) self.assertIsInstance(connection, Connection) self.assertIs(connection._client, mock_client) + self.assertIsNone(connection._bqstorage_client) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_ctor_w_bqstorage_client(self): + from google.cloud.bigquery.dbapi import Connection + + mock_client = self._mock_client() + mock_bqstorage_client = self._mock_bqstorage_client() + connection = self._make_one( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + self.assertIsInstance(connection, Connection) + self.assertIs(connection._client, mock_client) + self.assertIs(connection._bqstorage_client, mock_bqstorage_client) @mock.patch("google.cloud.bigquery.Client", autospec=True) def test_connect_wo_client(self, mock_client): @@ -49,6 +76,7 @@ def test_connect_wo_client(self, mock_client): connection = connect() self.assertIsInstance(connection, Connection) self.assertIsNotNone(connection._client) + self.assertIsNone(connection._bqstorage_client) def test_connect_w_client(self): from google.cloud.bigquery.dbapi import connect @@ -58,6 +86,23 @@ def test_connect_w_client(self): connection = connect(client=mock_client) self.assertIsInstance(connection, Connection) self.assertIs(connection._client, mock_client) + self.assertIsNone(connection._bqstorage_client) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_connect_w_both_clients(self): + from google.cloud.bigquery.dbapi import connect + from google.cloud.bigquery.dbapi import Connection + + mock_client = self._mock_client() + mock_bqstorage_client = self._mock_bqstorage_client() + connection = connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + self.assertIsInstance(connection, Connection) + self.assertIs(connection._client, mock_client) + self.assertIs(connection._bqstorage_client, mock_bqstorage_client) def test_close(self): connection = self._make_one(client=self._mock_client()) diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index 4ccd5e71a..e53cc158a 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import operator as op import unittest import mock +import six + +from google.api_core import exceptions + +try: + from google.cloud import bigquery_storage_v1beta1 +except ImportError: # pragma: NO COVER + bigquery_storage_v1beta1 = None class TestCursor(unittest.TestCase): @@ -44,6 +53,29 @@ def _mock_client(self, rows=None, schema=None, num_dml_affected_rows=None): mock_client.list_rows.return_value = rows return mock_client + def _mock_bqstorage_client(self, rows=None, stream_count=0): + from google.cloud.bigquery_storage_v1beta1 import client + from google.cloud.bigquery_storage_v1beta1 import types + + if rows is None: + rows = [] + + mock_client = mock.create_autospec(client.BigQueryStorageClient) + + mock_read_session = mock.MagicMock( + streams=[ + types.Stream(name="streams/stream_{}".format(i)) + for i in range(stream_count) + ] + ) + mock_client.create_read_session.return_value = mock_read_session + + mock_rows_stream = mock.MagicMock() + mock_rows_stream.rows.return_value = iter(rows) + mock_client.read_rows.return_value = mock_rows_stream + + return mock_client + def _mock_job(self, total_rows=0, schema=None, num_dml_affected_rows=None): from google.cloud.bigquery import job @@ -180,6 +212,156 @@ def test_fetchall_w_row(self): self.assertEqual(len(rows), 1) self.assertEqual(rows[0], (1,)) + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_fetchall_w_bqstorage_client_fetch_success(self): + from google.cloud.bigquery import dbapi + from google.cloud.bigquery import table + + # use unordered data to also test any non-determenistic key order in dicts + row_data = [ + table.Row([1.4, 1.1, 1.3, 1.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + table.Row([2.4, 2.1, 2.3, 2.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + ] + bqstorage_streamed_rows = [ + {"bar": 1.2, "foo": 1.1, "quux": 1.4, "baz": 1.3}, + {"bar": 2.2, "foo": 2.1, "quux": 2.4, "baz": 2.3}, + ] + + mock_client = self._mock_client(rows=row_data) + mock_bqstorage_client = self._mock_bqstorage_client( + stream_count=1, rows=bqstorage_streamed_rows, + ) + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + rows = cursor.fetchall() + + # the default client was not used + mock_client.list_rows.assert_not_called() + + # check the data returned + field_value = op.itemgetter(1) + sorted_row_data = [sorted(row.items(), key=field_value) for row in rows] + expected_row_data = [ + [("foo", 1.1), ("bar", 1.2), ("baz", 1.3), ("quux", 1.4)], + [("foo", 2.1), ("bar", 2.2), ("baz", 2.3), ("quux", 2.4)], + ] + + self.assertEqual(sorted_row_data, expected_row_data) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_fetchall_w_bqstorage_client_fetch_no_rows(self): + from google.cloud.bigquery import dbapi + + mock_client = self._mock_client(rows=[]) + mock_bqstorage_client = self._mock_bqstorage_client(stream_count=0) + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + rows = cursor.fetchall() + + # # the default client was not used + mock_client.list_rows.assert_not_called() + + # check the data returned + self.assertEqual(rows, []) + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_fetchall_w_bqstorage_client_fetch_error_no_fallback(self): + from google.cloud.bigquery import dbapi + from google.cloud.bigquery import table + + row_data = [table.Row([1.1, 1.2], {"foo": 0, "bar": 1})] + + mock_client = self._mock_client(rows=row_data) + mock_bqstorage_client = self._mock_bqstorage_client( + stream_count=1, rows=row_data, + ) + no_access_error = exceptions.Forbidden("invalid credentials") + mock_bqstorage_client.create_read_session.side_effect = no_access_error + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + with six.assertRaisesRegex(self, exceptions.Forbidden, "invalid credentials"): + cursor.fetchall() + + # the default client was not used + mock_client.list_rows.assert_not_called() + + @unittest.skipIf( + bigquery_storage_v1beta1 is None, "Requires `google-cloud-bigquery-storage`" + ) + def test_fetchall_w_bqstorage_client_fetch_error_fallback_on_client(self): + from google.cloud.bigquery import dbapi + from google.cloud.bigquery import table + + # use unordered data to also test any non-determenistic key order in dicts + row_data = [ + table.Row([1.4, 1.1, 1.3, 1.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + table.Row([2.4, 2.1, 2.3, 2.2], {"bar": 3, "baz": 2, "foo": 1, "quux": 0}), + ] + bqstorage_streamed_rows = [ + {"bar": 1.2, "foo": 1.1, "quux": 1.4, "baz": 1.3}, + {"bar": 2.2, "foo": 2.1, "quux": 2.4, "baz": 2.3}, + ] + + mock_client = self._mock_client(rows=row_data) + mock_bqstorage_client = self._mock_bqstorage_client( + stream_count=1, rows=bqstorage_streamed_rows, + ) + request_error = exceptions.BadRequest("BQ storage what??") + mock_bqstorage_client.create_read_session.side_effect = request_error + + connection = dbapi.connect( + client=mock_client, bqstorage_client=mock_bqstorage_client, + ) + cursor = connection.cursor() + cursor.execute("SELECT foo, bar FROM some_table") + + logger_patcher = mock.patch("google.cloud.bigquery.dbapi.cursor._LOGGER") + with logger_patcher as mock_logger: + rows = cursor.fetchall() + + # both client were used + mock_bqstorage_client.create_read_session.assert_called() + mock_client.list_rows.assert_called() + + # fallback to default API should have been logged + relevant_calls = [ + call + for call in mock_logger.debug.call_args_list + if call.args and "tabledata.list API" in call.args[0] + ] + self.assertTrue(relevant_calls) + + # check the data returned + field_value = op.itemgetter(1) + sorted_row_data = [sorted(row.items(), key=field_value) for row in rows] + expected_row_data = [ + [("foo", 1.1), ("bar", 1.2), ("baz", 1.3), ("quux", 1.4)], + [("foo", 2.1), ("bar", 2.2), ("baz", 2.3), ("quux", 2.4)], + ] + + self.assertEqual(sorted_row_data, expected_row_data) + def test_execute_custom_job_id(self): from google.cloud.bigquery.dbapi import connect