From 3314dfbed62488503dc41b11e403a672fcf71048 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 24 Nov 2021 16:12:10 +0100 Subject: [PATCH] fix: apply timeout to all resumable upload requests (#1070) * fix: apply timeout to all resumable upload requests * Fix stub in test case * Improve timeout type and other type annotations * Annnotate return type of _do_resumable_upload() --- google/cloud/bigquery/client.py | 186 +++++++++++++++++++------------- tests/unit/test_client.py | 18 +++- 2 files changed, 128 insertions(+), 76 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3e641e195..a5f3d5419 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -31,9 +31,10 @@ import typing from typing import ( Any, - BinaryIO, Dict, + IO, Iterable, + Mapping, List, Optional, Sequence, @@ -112,10 +113,15 @@ pyarrow = _helpers.PYARROW_VERSIONS.try_import() TimeoutType = Union[float, None] +ResumableTimeoutType = Union[ + None, float, Tuple[float, float] +] # for resumable media methods if typing.TYPE_CHECKING: # pragma: NO COVER # os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition. PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]] + import pandas # type: ignore + import requests # required by api-core _DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB _MAX_MULTIPART_SIZE = 5 * 1024 * 1024 @@ -2348,7 +2354,7 @@ def load_table_from_uri( def load_table_from_file( self, - file_obj: BinaryIO, + file_obj: IO[bytes], destination: Union[Table, TableReference, TableListItem, str], rewind: bool = False, size: int = None, @@ -2358,7 +2364,7 @@ def load_table_from_file( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of this table from a file-like object. @@ -2366,42 +2372,42 @@ def load_table_from_file( returns a :class:`~google.cloud.bigquery.job.LoadJob`. Args: - file_obj (file): A file handle opened in binary mode for reading. - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + file_obj: + A file handle opened in binary mode for reading. + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - rewind (Optional[bool]): + rewind: If True, seek to the beginning of the file handle before reading the file. - size (Optional[int]): + size: The number of bytes to read from the file handle. If size is ``None`` or large, resumable upload will be used. Otherwise, multipart upload will be used. - num_retries (Optional[int]): Number of upload retries. Defaults to 6. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. Defaults to 6. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request + may be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2453,7 +2459,7 @@ def load_table_from_file( def load_table_from_dataframe( self, - dataframe, + dataframe: "pandas.DataFrame", destination: Union[Table, TableReference, str], num_retries: int = _DEFAULT_NUM_RETRIES, job_id: str = None, @@ -2462,7 +2468,7 @@ def load_table_from_dataframe( project: str = None, job_config: LoadJobConfig = None, parquet_compression: str = "snappy", - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a pandas DataFrame. @@ -2481,9 +2487,9 @@ def load_table_from_dataframe( https://github.com/googleapis/python-bigquery/issues/19 Args: - dataframe (pandas.DataFrame): + dataframe: A :class:`~pandas.DataFrame` containing the data to load. - destination (google.cloud.bigquery.table.TableReference): + destination: The destination table to use for loading the data. If it is an existing table, the schema of the :class:`~pandas.DataFrame` must match the schema of the destination table. If the table @@ -2495,19 +2501,19 @@ def load_table_from_dataframe( :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. To override the default pandas data type conversions, supply @@ -2524,7 +2530,7 @@ def load_table_from_dataframe( :attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and :attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are supported. - parquet_compression (Optional[str]): + parquet_compression: [Beta] The compression method to use if intermittently serializing ``dataframe`` to a parquet file. @@ -2537,9 +2543,13 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2717,7 +2727,7 @@ def load_table_from_json( location: str = None, project: str = None, job_config: LoadJobConfig = None, - timeout: TimeoutType = DEFAULT_TIMEOUT, + timeout: ResumableTimeoutType = DEFAULT_TIMEOUT, ) -> job.LoadJob: """Upload the contents of a table from a JSON string or dict. @@ -2741,36 +2751,35 @@ def load_table_from_json( client = bigquery.Client() client.load_table_from_file(data_as_file, ...) - destination (Union[ \ - google.cloud.bigquery.table.Table, \ - google.cloud.bigquery.table.TableReference, \ - google.cloud.bigquery.table.TableListItem, \ - str, \ - ]): + destination: Table into which data is to be loaded. If a string is passed in, this method attempts to create a table reference from a string using :func:`google.cloud.bigquery.table.TableReference.from_string`. Keyword Arguments: - num_retries (Optional[int]): Number of upload retries. - job_id (Optional[str]): Name of the job. - job_id_prefix (Optional[str]): + num_retries: Number of upload retries. + job_id: Name of the job. + job_id_prefix: The user-provided prefix for a randomly generated job ID. This parameter will be ignored if a ``job_id`` is also given. - location (Optional[str]): + location: Location where to run the job. Must match the location of the destination table. - project (Optional[str]): + project: Project ID of the project of where to run the job. Defaults to the client's project. - job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): + job_config: Extra configuration options for the job. The ``source_format`` setting is always set to :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. - timeout (Optional[float]): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2819,60 +2828,77 @@ def load_table_from_json( ) def _do_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None - ): + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, + ) -> "requests.Response": """Perform a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. Returns: - requests.Response: - The "200 OK" response object returned after the final chunk - is uploaded. + The "200 OK" response object returned after the final chunk + is uploaded. """ upload, transport = self._initiate_resumable_upload( stream, metadata, num_retries, timeout, project=project ) while not upload.finished: - response = upload.transmit_next_chunk(transport) + response = upload.transmit_next_chunk(transport, timeout=timeout) return response def _initiate_resumable_upload( - self, stream, metadata, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Initiate a resumable upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. @@ -2921,29 +2947,39 @@ def _initiate_resumable_upload( return upload, transport def _do_multipart_upload( - self, stream, metadata, size, num_retries, timeout, project=None + self, + stream: IO[bytes], + metadata: Mapping[str, str], + size: int, + num_retries: int, + timeout: Optional[ResumableTimeoutType], + project: Optional[str] = None, ): """Perform a multipart upload. Args: - stream (IO[bytes]): A bytes IO object open for reading. + stream: A bytes IO object open for reading. - metadata (Dict): The metadata associated with the upload. + metadata: The metadata associated with the upload. - size (int): + size: The number of bytes to be uploaded (which will be read from ``stream``). If not provided, the upload will be concluded once ``stream`` is exhausted (or :data:`None`). - num_retries (int): + num_retries: Number of upload retries. (Deprecated: This argument will be removed in a future release.) - timeout (float): + timeout: The number of seconds to wait for the underlying HTTP transport - before using ``retry``. + before using ``retry``. Depending on the retry strategy, a request may + be repeated several times using the same timeout each time. - project (Optional[str]): + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. + + project: Project ID of the project of where to run the upload. Defaults to the client's project. diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 97aa2eedb..9c93765e8 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -8235,6 +8235,22 @@ def test__do_resumable_upload_custom_project(self): assert initiation_url is not None assert "projects/custom-project" in initiation_url + def test__do_resumable_upload_custom_timeout(self): + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + transport = self._make_transport( + self._make_resumable_upload_responses(file_obj_len) + ) + client = self._make_client(transport) + + client._do_resumable_upload( + file_obj, self.EXPECTED_CONFIGURATION, num_retries=0, timeout=3.14 + ) + + # The timeout should be applied to all underlying calls. + for call_args in transport.request.call_args_list: + assert call_args.kwargs.get("timeout") == 3.14 + def test__do_multipart_upload(self): transport = self._make_transport([self._make_response(http.client.OK)]) client = self._make_client(transport) @@ -8442,7 +8458,7 @@ def test_upload_chunksize(client): upload.finished = False - def transmit_next_chunk(transport): + def transmit_next_chunk(transport, *args, **kwargs): upload.finished = True result = mock.MagicMock() result.json.return_value = {}