From a17be5f01043f32d9fbfb2ddf456031ea9205c8f Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Mon, 9 Mar 2020 18:49:49 +0000 Subject: [PATCH] fix: distinguish server timeouts from transport timeouts (#43) * fix: distinguish transport and query timeouts A transport layer timeout is made independent of the query timeout, i.e. the maximum time to wait for the query to complete. The query timeout is used by the blocking poll so that the backend does not block for too long when polling for job completion, but the transport can have different timeout requirements, and we do not want it to be raising sometimes unnecessary timeout errors. * Apply timeout to each of the underlying requests As job methods do not split the timeout anymore between all requests a method might make, the Client methods are adjusted in the same way. --- google/cloud/bigquery/client.py | 37 +++++--------- google/cloud/bigquery/job.py | 62 +++++++---------------- tests/unit/test_client.py | 78 ----------------------------- tests/unit/test_job.py | 88 --------------------------------- 4 files changed, 32 insertions(+), 233 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 188fb19cb..343217ae6 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -22,7 +22,6 @@ except ImportError: # Python 2.7 import collections as collections_abc -import concurrent.futures import copy import functools import gzip @@ -48,7 +47,6 @@ import google.api_core.client_options import google.api_core.exceptions from google.api_core import page_iterator -from google.auth.transport.requests import TimeoutGuard import google.cloud._helpers from google.cloud import exceptions from google.cloud.client import ClientWithProject @@ -2598,27 +2596,22 @@ def list_partitions(self, table, retry=DEFAULT_RETRY, timeout=None): timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - If multiple requests are made under the hood, ``timeout`` is - interpreted as the approximate total time of **all** requests. + If multiple requests are made under the hood, ``timeout`` + applies to each individual request. Returns: List[str]: A list of the partition ids present in the partitioned table """ table = _table_arg_to_table_ref(table, default_project=self.project) - - with TimeoutGuard( - timeout, timeout_error_type=concurrent.futures.TimeoutError - ) as guard: - meta_table = self.get_table( - TableReference( - DatasetReference(table.project, table.dataset_id), - "%s$__PARTITIONS_SUMMARY__" % table.table_id, - ), - retry=retry, - timeout=timeout, - ) - timeout = guard.remaining_timeout + meta_table = self.get_table( + TableReference( + DatasetReference(table.project, table.dataset_id), + "%s$__PARTITIONS_SUMMARY__" % table.table_id, + ), + retry=retry, + timeout=timeout, + ) subset = [col for col in meta_table.schema if col.name == "partition_id"] return [ @@ -2685,8 +2678,8 @@ def list_rows( timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - If multiple requests are made under the hood, ``timeout`` is - interpreted as the approximate total time of **all** requests. + If multiple requests are made under the hood, ``timeout`` + applies to each individual request. Returns: google.cloud.bigquery.table.RowIterator: @@ -2711,11 +2704,7 @@ def list_rows( # No schema, but no selected_fields. Assume the developer wants all # columns, so get the table resource for them rather than failing. elif len(schema) == 0: - with TimeoutGuard( - timeout, timeout_error_type=concurrent.futures.TimeoutError - ) as guard: - table = self.get_table(table.reference, retry=retry, timeout=timeout) - timeout = guard.remaining_timeout + table = self.get_table(table.reference, retry=retry, timeout=timeout) schema = table.schema params = {} diff --git a/google/cloud/bigquery/job.py b/google/cloud/bigquery/job.py index 5861febe8..ab2eaede5 100644 --- a/google/cloud/bigquery/job.py +++ b/google/cloud/bigquery/job.py @@ -26,7 +26,6 @@ from six.moves import http_client import google.api_core.future.polling -from google.auth.transport.requests import TimeoutGuard from google.cloud import exceptions from google.cloud.exceptions import NotFound from google.cloud.bigquery.dataset import Dataset @@ -55,7 +54,6 @@ _DONE_STATE = "DONE" _STOPPED_REASON = "stopped" _TIMEOUT_BUFFER_SECS = 0.1 -_SERVER_TIMEOUT_MARGIN_SECS = 1.0 _CONTAINS_ORDER_BY = re.compile(r"ORDER\s+BY", re.IGNORECASE) _ERROR_REASON_TO_EXCEPTION = { @@ -796,8 +794,8 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - If multiple requests are made under the hood, ``timeout`` is - interpreted as the approximate total time of **all** requests. + If multiple requests are made under the hood, ``timeout`` + applies to each individual request. Returns: _AsyncJob: This instance. @@ -809,11 +807,7 @@ def result(self, retry=DEFAULT_RETRY, timeout=None): if the job did not complete in the given timeout. """ if self.state is None: - with TimeoutGuard( - timeout, timeout_error_type=concurrent.futures.TimeoutError - ) as guard: - self._begin(retry=retry, timeout=timeout) - timeout = guard.remaining_timeout + self._begin(retry=retry, timeout=timeout) # TODO: modify PollingFuture so it can pass a retry argument to done(). return super(_AsyncJob, self).result(timeout=timeout) @@ -2602,6 +2596,7 @@ def __init__(self, job_id, query, client, job_config=None): self._configuration = job_config self._query_results = None self._done_timeout = None + self._transport_timeout = None @property def allow_large_results(self): @@ -3059,19 +3054,9 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): self._done_timeout = max(0, self._done_timeout) timeout_ms = int(api_timeout * 1000) - # If the server-side processing timeout (timeout_ms) is specified and - # would be picked as the total request timeout, we want to add a small - # margin to it - we don't want to timeout the connection just as the - # server-side processing might have completed, but instead slightly - # after the server-side deadline. - # However, if `timeout` is specified, and is shorter than the adjusted - # server timeout, the former prevails. - if timeout_ms is not None and timeout_ms > 0: - server_timeout_with_margin = timeout_ms / 1000 + _SERVER_TIMEOUT_MARGIN_SECS - if timeout is not None: - timeout = min(server_timeout_with_margin, timeout) - else: - timeout = server_timeout_with_margin + # If an explicit timeout is not given, fall back to the transport timeout + # stored in _blocking_poll() in the process of polling for job completion. + transport_timeout = timeout if timeout is not None else self._transport_timeout # Do not refresh if the state is already done, as the job will not # change once complete. @@ -3082,19 +3067,20 @@ def done(self, retry=DEFAULT_RETRY, timeout=None): project=self.project, timeout_ms=timeout_ms, location=self.location, - timeout=timeout, + timeout=transport_timeout, ) # Only reload the job once we know the query is complete. # This will ensure that fields such as the destination table are # correctly populated. if self._query_results.complete: - self.reload(retry=retry, timeout=timeout) + self.reload(retry=retry, timeout=transport_timeout) return self.state == _DONE_STATE def _blocking_poll(self, timeout=None): self._done_timeout = timeout + self._transport_timeout = timeout super(QueryJob, self)._blocking_poll(timeout=timeout) @staticmethod @@ -3170,8 +3156,8 @@ def result( timeout (Optional[float]): The number of seconds to wait for the underlying HTTP transport before using ``retry``. - If multiple requests are made under the hood, ``timeout`` is - interpreted as the approximate total time of **all** requests. + If multiple requests are made under the hood, ``timeout`` + applies to each individual request. Returns: google.cloud.bigquery.table.RowIterator: @@ -3189,27 +3175,17 @@ def result( If the job did not complete in the given timeout. """ try: - guard = TimeoutGuard( - timeout, timeout_error_type=concurrent.futures.TimeoutError - ) - with guard: - super(QueryJob, self).result(retry=retry, timeout=timeout) - timeout = guard.remaining_timeout + super(QueryJob, self).result(retry=retry, timeout=timeout) # Return an iterator instead of returning the job. if not self._query_results: - guard = TimeoutGuard( - timeout, timeout_error_type=concurrent.futures.TimeoutError + self._query_results = self._client._get_query_results( + self.job_id, + retry, + project=self.project, + location=self.location, + timeout=timeout, ) - with guard: - self._query_results = self._client._get_query_results( - self.job_id, - retry, - project=self.project, - location=self.location, - timeout=timeout, - ) - timeout = guard.remaining_timeout except exceptions.GoogleCloudError as exc: exc.message += self._format_for_exception(self.query, self.job_id) exc.query_job = self diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index a82445876..b8dfbbad1 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -24,7 +24,6 @@ import unittest import warnings -import freezegun import mock import requests import six @@ -5496,43 +5495,6 @@ def test_list_partitions_with_string_id(self): self.assertEqual(len(partition_list), 0) - def test_list_partitions_splitting_timout_between_requests(self): - from google.cloud.bigquery.table import Table - - row_count = 2 - meta_info = _make_list_partitons_meta_info( - self.PROJECT, self.DS_ID, self.TABLE_ID, row_count - ) - - data = { - "totalRows": str(row_count), - "rows": [{"f": [{"v": "20180101"}]}, {"f": [{"v": "20180102"}]}], - } - creds = _make_credentials() - http = object() - client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - client._connection = make_connection(meta_info, data) - table = Table(self.TABLE_REF) - - with freezegun.freeze_time("2019-01-01 00:00:00", tick=False) as frozen_time: - - def delayed_get_table(*args, **kwargs): - frozen_time.tick(delta=1.4) - return orig_get_table(*args, **kwargs) - - orig_get_table = client.get_table - client.get_table = mock.Mock(side_effect=delayed_get_table) - - client.list_partitions(table, timeout=5.0) - - client.get_table.assert_called_once() - _, kwargs = client.get_table.call_args - self.assertEqual(kwargs.get("timeout"), 5.0) - - client._connection.api_request.assert_called() - _, kwargs = client._connection.api_request.call_args - self.assertAlmostEqual(kwargs.get("timeout"), 3.6, places=5) - def test_list_rows(self): import datetime from google.cloud._helpers import UTC @@ -5918,46 +5880,6 @@ def test_list_rows_with_missing_schema(self): self.assertEqual(rows[1].age, 31, msg=repr(table)) self.assertIsNone(rows[2].age, msg=repr(table)) - def test_list_rows_splitting_timout_between_requests(self): - from google.cloud.bigquery.schema import SchemaField - from google.cloud.bigquery.table import Table - - response = {"totalRows": "0", "rows": []} - creds = _make_credentials() - http = object() - client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) - client._connection = make_connection(response, response) - - table = Table( - self.TABLE_REF, schema=[SchemaField("field_x", "INTEGER", mode="NULLABLE")] - ) - - with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: - - def delayed_get_table(*args, **kwargs): - frozen_time.tick(delta=1.4) - return table - - client.get_table = mock.Mock(side_effect=delayed_get_table) - - rows_iter = client.list_rows( - "{}.{}.{}".format( - self.TABLE_REF.project, - self.TABLE_REF.dataset_id, - self.TABLE_REF.table_id, - ), - timeout=5.0, - ) - six.next(rows_iter.pages) - - client.get_table.assert_called_once() - _, kwargs = client.get_table.call_args - self.assertEqual(kwargs.get("timeout"), 5.0) - - client._connection.api_request.assert_called_once() - _, kwargs = client._connection.api_request.call_args - self.assertAlmostEqual(kwargs.get("timeout"), 3.6) - def test_list_rows_error(self): creds = _make_credentials() http = object() diff --git a/tests/unit/test_job.py b/tests/unit/test_job.py index 6b0d4b8fb..3e642142d 100644 --- a/tests/unit/test_job.py +++ b/tests/unit/test_job.py @@ -994,24 +994,6 @@ def test_result_explicit_w_state(self, result): begin.assert_not_called() result.assert_called_once_with(timeout=timeout) - @mock.patch("google.api_core.future.polling.PollingFuture.result") - def test_result_splitting_timout_between_requests(self, result): - client = _make_client(project=self.PROJECT) - job = self._make_one(self.JOB_ID, client) - begin = job._begin = mock.Mock() - retry = mock.Mock() - - with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: - - def delayed_begin(*args, **kwargs): - frozen_time.tick(delta=0.3) - - begin.side_effect = delayed_begin - job.result(retry=retry, timeout=1.0) - - begin.assert_called_once_with(retry=retry, timeout=1.0) - result.assert_called_once_with(timeout=0.7) - def test_cancelled_wo_error_result(self): client = _make_client(project=self.PROJECT) job = self._make_one(self.JOB_ID, client) @@ -4011,33 +3993,6 @@ def test_done_w_timeout(self): call_args = fake_reload.call_args self.assertEqual(call_args.kwargs.get("timeout"), 42) - def test_done_w_timeout_and_shorter_internal_api_timeout(self): - from google.cloud.bigquery.job import _TIMEOUT_BUFFER_SECS - from google.cloud.bigquery.job import _SERVER_TIMEOUT_MARGIN_SECS - - client = _make_client(project=self.PROJECT) - resource = self._make_resource(ended=False) - job = self._get_target_class().from_api_repr(resource, client) - job._done_timeout = 8.8 - - with mock.patch.object( - client, "_get_query_results" - ) as fake_get_results, mock.patch.object(job, "reload") as fake_reload: - job.done(timeout=42) - - # The expected timeout used is the job's own done_timeout minus a - # fixed amount (bigquery.job._TIMEOUT_BUFFER_SECS) increased by the - # safety margin on top of server-side processing timeout - that's - # because that final number is smaller than the given timeout (42 seconds). - expected_timeout = 8.8 - _TIMEOUT_BUFFER_SECS + _SERVER_TIMEOUT_MARGIN_SECS - - fake_get_results.assert_called_once() - call_args = fake_get_results.call_args - self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - - call_args = fake_reload.call_args - self.assertAlmostEqual(call_args.kwargs.get("timeout"), expected_timeout) - def test_done_w_timeout_and_longer_internal_api_timeout(self): client = _make_client(project=self.PROJECT) resource = self._make_resource(ended=False) @@ -4623,49 +4578,6 @@ def test_result_w_timeout(self): self.assertEqual(query_request[1]["query_params"]["timeoutMs"], 900) self.assertEqual(reload_request[1]["method"], "GET") - @mock.patch("google.api_core.future.polling.PollingFuture.result") - def test_result_splitting_timout_between_requests(self, polling_result): - begun_resource = self._make_resource() - query_resource = { - "jobComplete": True, - "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, - "schema": {"fields": [{"name": "col1", "type": "STRING"}]}, - "totalRows": "5", - } - done_resource = copy.deepcopy(begun_resource) - done_resource["status"] = {"state": "DONE"} - - connection = _make_connection(begun_resource, query_resource, done_resource) - client = _make_client(project=self.PROJECT, connection=connection) - job = self._make_one(self.JOB_ID, self.QUERY, client) - - client.list_rows = mock.Mock() - - with freezegun.freeze_time("1970-01-01 00:00:00", tick=False) as frozen_time: - - def delayed_result(*args, **kwargs): - frozen_time.tick(delta=0.8) - - polling_result.side_effect = delayed_result - - def delayed_get_results(*args, **kwargs): - frozen_time.tick(delta=0.5) - return orig_get_results(*args, **kwargs) - - orig_get_results = client._get_query_results - client._get_query_results = mock.Mock(side_effect=delayed_get_results) - job.result(timeout=2.0) - - polling_result.assert_called_once_with(timeout=2.0) - - client._get_query_results.assert_called_once() - _, kwargs = client._get_query_results.call_args - self.assertAlmostEqual(kwargs.get("timeout"), 1.2) - - client.list_rows.assert_called_once() - _, kwargs = client.list_rows.call_args - self.assertAlmostEqual(kwargs.get("timeout"), 0.7) - def test_result_w_page_size(self): # Arrange query_results_resource = {