Skip to content

Commit

Permalink
feat(resumable-media): add customizable timeouts to upload/download m…
Browse files Browse the repository at this point in the history
…ethods (#116)

* Add customizable timeout to simple upload

* Add customizable timeout to multipart upload

* Add customizable timeout to resumable upload

* Blacken system tests

* Add customizable timeouts to Download classes

Co-authored-by: Frank Natividad <frankyn@users.noreply.github.com>
  • Loading branch information
plamut and frankyn committed Jun 29, 2020
1 parent 063b4f9 commit 5310921
Show file tree
Hide file tree
Showing 6 changed files with 358 additions and 21 deletions.
18 changes: 16 additions & 2 deletions google/resumable_media/_download.py
Expand Up @@ -171,7 +171,7 @@ def _process_response(self, response):
response, _ACCEPTABLE_STATUS_CODES, self._get_status_code
)

def consume(self, transport):
def consume(self, transport, timeout=None):
"""Consume the resource to be downloaded.
If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -180,6 +180,13 @@ def consume(self, transport):
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -398,12 +405,19 @@ def _process_response(self, response):
# Write the response body to the stream.
self._stream.write(response_body)

def consume_next_chunk(self, transport):
def consume_next_chunk(self, transport, timeout=None):
"""Consume the next chunk of the resource to be downloaded.
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down
35 changes: 32 additions & 3 deletions google/resumable_media/_upload.py
Expand Up @@ -198,7 +198,7 @@ def _prepare_request(self, data, content_type):
self._headers[_CONTENT_TYPE_HEADER] = content_type
return _POST, self.upload_url, data, self._headers

def transmit(self, transport, data, content_type):
def transmit(self, transport, data, content_type, timeout=None):
"""Transmit the resource to be uploaded.
Args:
Expand All @@ -207,6 +207,13 @@ def transmit(self, transport, data, content_type):
data (bytes): The resource content to be uploaded.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -274,7 +281,7 @@ def _prepare_request(self, data, metadata, content_type):
self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type
return _POST, self.upload_url, content, self._headers

def transmit(self, transport, data, metadata, content_type):
def transmit(self, transport, data, metadata, content_type, timeout=None):
"""Transmit the resource to be uploaded.
Args:
Expand All @@ -285,6 +292,13 @@ def transmit(self, transport, data, metadata, content_type):
ACL list.
content_type (str): The content type of the resource, e.g. a JPEG
image has content type ``image/jpeg``.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -468,6 +482,7 @@ def initiate(
content_type,
total_bytes=None,
stream_final=True,
timeout=None,
):
"""Initiate a resumable upload.
Expand Down Expand Up @@ -499,6 +514,13 @@ def initiate(
"final" (i.e. no more bytes will be added to it). In this case
we determine the upload size from the size of the stream. If
``total_bytes`` is passed, this argument will be ignored.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down Expand Up @@ -626,7 +648,7 @@ def _process_response(self, response, bytes_sent):
)
self._bytes_uploaded = int(match.group(u"end_byte")) + 1

def transmit_next_chunk(self, transport):
def transmit_next_chunk(self, transport, timeout=None):
"""Transmit the next chunk of the resource to be uploaded.
If the current upload was initiated with ``stream_final=False``,
Expand All @@ -637,6 +659,13 @@ def transmit_next_chunk(self, transport):
Args:
transport (object): An object which can make authenticated
requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Raises:
NotImplementedError: Always, since virtual.
Expand Down
56 changes: 52 additions & 4 deletions google/resumable_media/requests/download.py
Expand Up @@ -119,7 +119,11 @@ def _write_to_stream(self, response):
)
raise common.DataCorruption(response, msg)

def consume(self, transport):
def consume(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the resource to be downloaded.
If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -128,6 +132,13 @@ def consume(self, transport):
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -144,6 +155,7 @@ def consume(self, transport):
u"data": payload,
u"headers": headers,
u"retry_strategy": self._retry_strategy,
u"timeout": timeout,
}
if self._stream is not None:
request_kwargs[u"stream"] = True
Expand Down Expand Up @@ -231,7 +243,11 @@ def _write_to_stream(self, response):
)
raise common.DataCorruption(response, msg)

def consume(self, transport):
def consume(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the resource to be downloaded.
If a ``stream`` is attached to this download, then the downloaded
Expand All @@ -240,6 +256,13 @@ def consume(self, transport):
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -260,6 +283,7 @@ def consume(self, transport):
headers=headers,
retry_strategy=self._retry_strategy,
stream=True,
timeout=timeout,
)

self._process_response(result)
Expand Down Expand Up @@ -298,12 +322,23 @@ class ChunkedDownload(_helpers.RequestsMixin, _download.ChunkedDownload):
ValueError: If ``start`` is negative.
"""

def consume_next_chunk(self, transport):
def consume_next_chunk(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the next chunk of the resource to be downloaded.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -320,6 +355,7 @@ def consume_next_chunk(self, transport):
data=payload,
headers=headers,
retry_strategy=self._retry_strategy,
timeout=timeout,
)
self._process_response(result)
return result
Expand Down Expand Up @@ -353,12 +389,23 @@ class RawChunkedDownload(_helpers.RawRequestsMixin, _download.ChunkedDownload):
ValueError: If ``start`` is negative.
"""

def consume_next_chunk(self, transport):
def consume_next_chunk(
self,
transport,
timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT),
):
"""Consume the next chunk of the resource to be downloaded.
Args:
transport (~requests.Session): A ``requests`` object which can
make authenticated requests.
timeout (Optional[Union[float, Tuple[float, float]]]):
The number of seconds to wait for the server response.
Depending on the retry strategy, a request may be repeated
several times using the same timeout each time.
Can also be passed as a tuple (connect_timeout, read_timeout).
See :meth:`requests.Session.request` documentation for details.
Returns:
~requests.Response: The HTTP response returned by ``transport``.
Expand All @@ -376,6 +423,7 @@ def consume_next_chunk(self, transport):
headers=headers,
stream=True,
retry_strategy=self._retry_strategy,
timeout=timeout,
)
self._process_response(result)
return result
Expand Down

0 comments on commit 5310921

Please sign in to comment.