Skip to content

Commit

Permalink
fix: apply timeout to all resumable upload requests (#1070)
Browse files Browse the repository at this point in the history
* fix: apply timeout to all resumable upload requests

* Fix stub in test case

* Improve timeout type and other type annotations

* Annnotate return type of _do_resumable_upload()
  • Loading branch information
plamut committed Nov 24, 2021
1 parent 21cd710 commit 3314dfb
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 76 deletions.
186 changes: 111 additions & 75 deletions google/cloud/bigquery/client.py
Expand Up @@ -31,9 +31,10 @@
import typing
from typing import (
Any,
BinaryIO,
Dict,
IO,
Iterable,
Mapping,
List,
Optional,
Sequence,
Expand Down Expand Up @@ -112,10 +113,15 @@
pyarrow = _helpers.PYARROW_VERSIONS.try_import()

TimeoutType = Union[float, None]
ResumableTimeoutType = Union[
None, float, Tuple[float, float]
] # for resumable media methods

if typing.TYPE_CHECKING: # pragma: NO COVER
# os.PathLike is only subscriptable in Python 3.9+, thus shielding with a condition.
PathType = Union[str, bytes, os.PathLike[str], os.PathLike[bytes]]
import pandas # type: ignore
import requests # required by api-core

_DEFAULT_CHUNKSIZE = 100 * 1024 * 1024 # 100 MB
_MAX_MULTIPART_SIZE = 5 * 1024 * 1024
Expand Down Expand Up @@ -2348,7 +2354,7 @@ def load_table_from_uri(

def load_table_from_file(
self,
file_obj: BinaryIO,
file_obj: IO[bytes],
destination: Union[Table, TableReference, TableListItem, str],
rewind: bool = False,
size: int = None,
Expand All @@ -2358,50 +2364,50 @@ def load_table_from_file(
location: str = None,
project: str = None,
job_config: LoadJobConfig = None,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of this table from a file-like object.
Similar to :meth:`load_table_from_uri`, this method creates, starts and
returns a :class:`~google.cloud.bigquery.job.LoadJob`.
Args:
file_obj (file): A file handle opened in binary mode for reading.
destination (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
google.cloud.bigquery.table.TableListItem, \
str, \
]):
file_obj:
A file handle opened in binary mode for reading.
destination:
Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a
string using
:func:`google.cloud.bigquery.table.TableReference.from_string`.
Keyword Arguments:
rewind (Optional[bool]):
rewind:
If True, seek to the beginning of the file handle before
reading the file.
size (Optional[int]):
size:
The number of bytes to read from the file handle. If size is
``None`` or large, resumable upload will be used. Otherwise,
multipart upload will be used.
num_retries (Optional[int]): Number of upload retries. Defaults to 6.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries. Defaults to 6.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated job ID.
This parameter will be ignored if a ``job_id`` is also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job.
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request
may be repeated several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2453,7 +2459,7 @@ def load_table_from_file(

def load_table_from_dataframe(
self,
dataframe,
dataframe: "pandas.DataFrame",
destination: Union[Table, TableReference, str],
num_retries: int = _DEFAULT_NUM_RETRIES,
job_id: str = None,
Expand All @@ -2462,7 +2468,7 @@ def load_table_from_dataframe(
project: str = None,
job_config: LoadJobConfig = None,
parquet_compression: str = "snappy",
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of a table from a pandas DataFrame.
Expand All @@ -2481,9 +2487,9 @@ def load_table_from_dataframe(
https://github.com/googleapis/python-bigquery/issues/19
Args:
dataframe (pandas.DataFrame):
dataframe:
A :class:`~pandas.DataFrame` containing the data to load.
destination (google.cloud.bigquery.table.TableReference):
destination:
The destination table to use for loading the data. If it is an
existing table, the schema of the :class:`~pandas.DataFrame`
must match the schema of the destination table. If the table
Expand All @@ -2495,19 +2501,19 @@ def load_table_from_dataframe(
:func:`google.cloud.bigquery.table.TableReference.from_string`.
Keyword Arguments:
num_retries (Optional[int]): Number of upload retries.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated
job ID. This parameter will be ignored if a ``job_id`` is
also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job.
To override the default pandas data type conversions, supply
Expand All @@ -2524,7 +2530,7 @@ def load_table_from_dataframe(
:attr:`~google.cloud.bigquery.job.SourceFormat.CSV` and
:attr:`~google.cloud.bigquery.job.SourceFormat.PARQUET` are
supported.
parquet_compression (Optional[str]):
parquet_compression:
[Beta] The compression method to use if intermittently
serializing ``dataframe`` to a parquet file.
Expand All @@ -2537,9 +2543,13 @@ def load_table_from_dataframe(
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2717,7 +2727,7 @@ def load_table_from_json(
location: str = None,
project: str = None,
job_config: LoadJobConfig = None,
timeout: TimeoutType = DEFAULT_TIMEOUT,
timeout: ResumableTimeoutType = DEFAULT_TIMEOUT,
) -> job.LoadJob:
"""Upload the contents of a table from a JSON string or dict.
Expand All @@ -2741,36 +2751,35 @@ def load_table_from_json(
client = bigquery.Client()
client.load_table_from_file(data_as_file, ...)
destination (Union[ \
google.cloud.bigquery.table.Table, \
google.cloud.bigquery.table.TableReference, \
google.cloud.bigquery.table.TableListItem, \
str, \
]):
destination:
Table into which data is to be loaded. If a string is passed
in, this method attempts to create a table reference from a
string using
:func:`google.cloud.bigquery.table.TableReference.from_string`.
Keyword Arguments:
num_retries (Optional[int]): Number of upload retries.
job_id (Optional[str]): Name of the job.
job_id_prefix (Optional[str]):
num_retries: Number of upload retries.
job_id: Name of the job.
job_id_prefix:
The user-provided prefix for a randomly generated job ID.
This parameter will be ignored if a ``job_id`` is also given.
location (Optional[str]):
location:
Location where to run the job. Must match the location of the
destination table.
project (Optional[str]):
project:
Project ID of the project of where to run the job. Defaults
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
job_config:
Extra configuration options for the job. The ``source_format``
setting is always set to
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
timeout (Optional[float]):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2819,60 +2828,77 @@ def load_table_from_json(
)

def _do_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
):
self,
stream: IO[bytes],
metadata: Mapping[str, str],
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
) -> "requests.Response":
"""Perform a resumable upload.
Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.
metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.
num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.
project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
project:
Project ID of the project of where to run the upload. Defaults
to the client's project.
Returns:
requests.Response:
The "200 OK" response object returned after the final chunk
is uploaded.
The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries, timeout, project=project
)

while not upload.finished:
response = upload.transmit_next_chunk(transport)
response = upload.transmit_next_chunk(transport, timeout=timeout)

return response

def _initiate_resumable_upload(
self, stream, metadata, num_retries, timeout, project=None
self,
stream: IO[bytes],
metadata: Mapping[str, str],
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
):
"""Initiate a resumable upload.
Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.
metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.
num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.
project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
project:
Project ID of the project of where to run the upload. Defaults
to the client's project.
Expand Down Expand Up @@ -2921,29 +2947,39 @@ def _initiate_resumable_upload(
return upload, transport

def _do_multipart_upload(
self, stream, metadata, size, num_retries, timeout, project=None
self,
stream: IO[bytes],
metadata: Mapping[str, str],
size: int,
num_retries: int,
timeout: Optional[ResumableTimeoutType],
project: Optional[str] = None,
):
"""Perform a multipart upload.
Args:
stream (IO[bytes]): A bytes IO object open for reading.
stream: A bytes IO object open for reading.
metadata (Dict): The metadata associated with the upload.
metadata: The metadata associated with the upload.
size (int):
size:
The number of bytes to be uploaded (which will be read
from ``stream``). If not provided, the upload will be
concluded once ``stream`` is exhausted (or :data:`None`).
num_retries (int):
num_retries:
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
timeout:
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
before using ``retry``. Depending on the retry strategy, a request may
be repeated several times using the same timeout each time.
project (Optional[str]):
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
project:
Project ID of the project of where to run the upload. Defaults
to the client's project.
Expand Down

0 comments on commit 3314dfb

Please sign in to comment.