Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add timeout paramter to load_table_from_file and it dependent methods #327

Merged
merged 3 commits into from Oct 19, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 59 additions & 12 deletions google/cloud/bigquery/client.py
Expand Up @@ -1591,14 +1591,17 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(self, job_config, retry=DEFAULT_RETRY):
def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
"""Create a new job.
Args:
job_config (dict): configuration job representation returned from the API.

Keyword Arguments:
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
Union[ \
Expand All @@ -1617,7 +1620,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
source_uris,
destination,
job_config=load_job_config,
retry=retry,
timeout=timeout,
)
elif "copy" in job_config:
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
Expand All @@ -1633,7 +1640,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
return self.copy_table(
sources, destination, job_config=copy_job_config, retry=retry
sources,
destination,
job_config=copy_job_config,
retry=retry,
timeout=timeout,
)
elif "extract" in job_config:
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
Expand All @@ -1650,6 +1661,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
destination_uris,
job_config=extract_job_config,
retry=retry,
timeout=timeout,
source_type=source_type,
)
elif "query" in job_config:
Expand All @@ -1659,7 +1671,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
copy_config
)
query = _get_sub_prop(copy_config, ["query", "query"])
return self.query(query, job_config=query_job_config, retry=retry)
return self.query(
query, job_config=query_job_config, retry=retry, timeout=timeout
)
else:
raise TypeError("Invalid job configuration received.")

Expand Down Expand Up @@ -1981,6 +1995,7 @@ def load_table_from_file(
location=None,
project=None,
job_config=None,
timeout=None,
):
"""Upload the contents of this table from a file-like object.

Expand Down Expand Up @@ -2020,6 +2035,9 @@ def load_table_from_file(
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
Extra configuration options for the job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2058,11 +2076,11 @@ def load_table_from_file(
try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
file_obj, job_resource, num_retries
file_obj, job_resource, num_retries, timeout
)
else:
response = self._do_multipart_upload(
file_obj, job_resource, size, num_retries
file_obj, job_resource, size, num_retries, timeout
)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)
Expand All @@ -2080,6 +2098,7 @@ def load_table_from_dataframe(
project=None,
job_config=None,
parquet_compression="snappy",
timeout=None,
):
"""Upload the contents of a table from a pandas DataFrame.

Expand Down Expand Up @@ -2143,6 +2162,9 @@ def load_table_from_dataframe(
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2249,6 +2271,7 @@ def load_table_from_dataframe(
location=location,
project=project,
job_config=job_config,
timeout=timeout,
)

finally:
Expand All @@ -2264,6 +2287,7 @@ def load_table_from_json(
location=None,
project=None,
job_config=None,
timeout=None,
):
"""Upload the contents of a table from a JSON string or dict.

Expand Down Expand Up @@ -2313,6 +2337,9 @@ def load_table_from_json(
Extra configuration options for the job. The ``source_format``
setting is always set to
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2357,9 +2384,10 @@ def load_table_from_json(
location=location,
project=project,
job_config=job_config,
timeout=timeout,
)

def _do_resumable_upload(self, stream, metadata, num_retries):
def _do_resumable_upload(self, stream, metadata, num_retries, timeout):
"""Perform a resumable upload.

Args:
Expand All @@ -2371,21 +2399,25 @@ def _do_resumable_upload(self, stream, metadata, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
requests.Response:
The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries
stream, metadata, num_retries, timeout
)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _initiate_resumable_upload(self, stream, metadata, num_retries):
def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
"""Initiate a resumable upload.

Args:
Expand All @@ -2397,6 +2429,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
Tuple:
Pair of
Expand All @@ -2419,12 +2455,17 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
)

upload.initiate(
transport, stream, metadata, _GENERIC_CONTENT_TYPE, stream_final=False
transport,
stream,
metadata,
_GENERIC_CONTENT_TYPE,
stream_final=False,
timeout=timeout,
)

return upload, transport

def _do_multipart_upload(self, stream, metadata, size, num_retries):
def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):
"""Perform a multipart upload.

Args:
Expand All @@ -2441,6 +2482,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)

timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.

Returns:
requests.Response:
The "200 OK" response object returned after the multipart
Expand All @@ -2466,7 +2511,9 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
max_retries=num_retries
)

response = upload.transmit(self._http, data, metadata, _GENERIC_CONTENT_TYPE)
response = upload.transmit(
self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
)

return response

Expand Down