Skip to content

Commit

Permalink
feat: add timeout paramter to load_table_from_file and it dependent m…
Browse files Browse the repository at this point in the history
…ethods (#327)
  • Loading branch information
HemangChothani committed Oct 19, 2020
1 parent 29dd573 commit b0dd892
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 24 deletions.
71 changes: 59 additions & 12 deletions google/cloud/bigquery/client.py
Expand Up @@ -1591,14 +1591,17 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(self, job_config, retry=DEFAULT_RETRY):
def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
"""Create a new job.
Args:
job_config (dict): configuration job representation returned from the API.
Keyword Arguments:
retry (Optional[google.api_core.retry.Retry]):
How to retry the RPC.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
Union[ \
Expand All @@ -1617,7 +1620,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
source_uris,
destination,
job_config=load_job_config,
retry=retry,
timeout=timeout,
)
elif "copy" in job_config:
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
Expand All @@ -1633,7 +1640,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
return self.copy_table(
sources, destination, job_config=copy_job_config, retry=retry
sources,
destination,
job_config=copy_job_config,
retry=retry,
timeout=timeout,
)
elif "extract" in job_config:
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
Expand All @@ -1650,6 +1661,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
destination_uris,
job_config=extract_job_config,
retry=retry,
timeout=timeout,
source_type=source_type,
)
elif "query" in job_config:
Expand All @@ -1659,7 +1671,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
copy_config
)
query = _get_sub_prop(copy_config, ["query", "query"])
return self.query(query, job_config=query_job_config, retry=retry)
return self.query(
query, job_config=query_job_config, retry=retry, timeout=timeout
)
else:
raise TypeError("Invalid job configuration received.")

Expand Down Expand Up @@ -1981,6 +1995,7 @@ def load_table_from_file(
location=None,
project=None,
job_config=None,
timeout=None,
):
"""Upload the contents of this table from a file-like object.
Expand Down Expand Up @@ -2020,6 +2035,9 @@ def load_table_from_file(
to the client's project.
job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]):
Extra configuration options for the job.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2058,11 +2076,11 @@ def load_table_from_file(
try:
if size is None or size >= _MAX_MULTIPART_SIZE:
response = self._do_resumable_upload(
file_obj, job_resource, num_retries
file_obj, job_resource, num_retries, timeout
)
else:
response = self._do_multipart_upload(
file_obj, job_resource, size, num_retries
file_obj, job_resource, size, num_retries, timeout
)
except resumable_media.InvalidResponse as exc:
raise exceptions.from_http_response(exc.response)
Expand All @@ -2080,6 +2098,7 @@ def load_table_from_dataframe(
project=None,
job_config=None,
parquet_compression="snappy",
timeout=None,
):
"""Upload the contents of a table from a pandas DataFrame.
Expand Down Expand Up @@ -2143,6 +2162,9 @@ def load_table_from_dataframe(
passed as the ``compression`` argument to the underlying
``DataFrame.to_parquet()`` method.
https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2249,6 +2271,7 @@ def load_table_from_dataframe(
location=location,
project=project,
job_config=job_config,
timeout=timeout,
)

finally:
Expand All @@ -2264,6 +2287,7 @@ def load_table_from_json(
location=None,
project=None,
job_config=None,
timeout=None,
):
"""Upload the contents of a table from a JSON string or dict.
Expand Down Expand Up @@ -2313,6 +2337,9 @@ def load_table_from_json(
Extra configuration options for the job. The ``source_format``
setting is always set to
:attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`.
timeout (Optional[float]):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
google.cloud.bigquery.job.LoadJob: A new load job.
Expand Down Expand Up @@ -2357,9 +2384,10 @@ def load_table_from_json(
location=location,
project=project,
job_config=job_config,
timeout=timeout,
)

def _do_resumable_upload(self, stream, metadata, num_retries):
def _do_resumable_upload(self, stream, metadata, num_retries, timeout):
"""Perform a resumable upload.
Args:
Expand All @@ -2371,21 +2399,25 @@ def _do_resumable_upload(self, stream, metadata, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
requests.Response:
The "200 OK" response object returned after the final chunk
is uploaded.
"""
upload, transport = self._initiate_resumable_upload(
stream, metadata, num_retries
stream, metadata, num_retries, timeout
)

while not upload.finished:
response = upload.transmit_next_chunk(transport)

return response

def _initiate_resumable_upload(self, stream, metadata, num_retries):
def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout):
"""Initiate a resumable upload.
Args:
Expand All @@ -2397,6 +2429,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
Tuple:
Pair of
Expand All @@ -2419,12 +2455,17 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries):
)

upload.initiate(
transport, stream, metadata, _GENERIC_CONTENT_TYPE, stream_final=False
transport,
stream,
metadata,
_GENERIC_CONTENT_TYPE,
stream_final=False,
timeout=timeout,
)

return upload, transport

def _do_multipart_upload(self, stream, metadata, size, num_retries):
def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout):
"""Perform a multipart upload.
Args:
Expand All @@ -2441,6 +2482,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
Number of upload retries. (Deprecated: This
argument will be removed in a future release.)
timeout (float):
The number of seconds to wait for the underlying HTTP transport
before using ``retry``.
Returns:
requests.Response:
The "200 OK" response object returned after the multipart
Expand All @@ -2466,7 +2511,9 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries):
max_retries=num_retries
)

response = upload.transmit(self._http, data, metadata, _GENERIC_CONTENT_TYPE)
response = upload.transmit(
self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout
)

return response

Expand Down

0 comments on commit b0dd892

Please sign in to comment.