diff --git a/google/resumable_media/_download.py b/google/resumable_media/_download.py index 5d2d10d1..d775c21e 100644 --- a/google/resumable_media/_download.py +++ b/google/resumable_media/_download.py @@ -171,7 +171,7 @@ def _process_response(self, response): response, _ACCEPTABLE_STATUS_CODES, self._get_status_code ) - def consume(self, transport): + def consume(self, transport, timeout=None): """Consume the resource to be downloaded. If a ``stream`` is attached to this download, then the downloaded @@ -180,6 +180,13 @@ def consume(self, transport): Args: transport (object): An object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. @@ -398,12 +405,19 @@ def _process_response(self, response): # Write the response body to the stream. self._stream.write(response_body) - def consume_next_chunk(self, transport): + def consume_next_chunk(self, transport, timeout=None): """Consume the next chunk of the resource to be downloaded. Args: transport (object): An object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. diff --git a/google/resumable_media/_upload.py b/google/resumable_media/_upload.py index dd5aaeda..de7c6972 100644 --- a/google/resumable_media/_upload.py +++ b/google/resumable_media/_upload.py @@ -198,7 +198,7 @@ def _prepare_request(self, data, content_type): self._headers[_CONTENT_TYPE_HEADER] = content_type return _POST, self.upload_url, data, self._headers - def transmit(self, transport, data, content_type): + def transmit(self, transport, data, content_type, timeout=None): """Transmit the resource to be uploaded. Args: @@ -207,6 +207,13 @@ def transmit(self, transport, data, content_type): data (bytes): The resource content to be uploaded. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. @@ -274,7 +281,7 @@ def _prepare_request(self, data, metadata, content_type): self._headers[_CONTENT_TYPE_HEADER] = multipart_content_type return _POST, self.upload_url, content, self._headers - def transmit(self, transport, data, metadata, content_type): + def transmit(self, transport, data, metadata, content_type, timeout=None): """Transmit the resource to be uploaded. Args: @@ -285,6 +292,13 @@ def transmit(self, transport, data, metadata, content_type): ACL list. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. @@ -468,6 +482,7 @@ def initiate( content_type, total_bytes=None, stream_final=True, + timeout=None, ): """Initiate a resumable upload. @@ -499,6 +514,13 @@ def initiate( "final" (i.e. no more bytes will be added to it). In this case we determine the upload size from the size of the stream. If ``total_bytes`` is passed, this argument will be ignored. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. @@ -626,7 +648,7 @@ def _process_response(self, response, bytes_sent): ) self._bytes_uploaded = int(match.group(u"end_byte")) + 1 - def transmit_next_chunk(self, transport): + def transmit_next_chunk(self, transport, timeout=None): """Transmit the next chunk of the resource to be uploaded. If the current upload was initiated with ``stream_final=False``, @@ -637,6 +659,13 @@ def transmit_next_chunk(self, transport): Args: transport (object): An object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Raises: NotImplementedError: Always, since virtual. diff --git a/google/resumable_media/requests/download.py b/google/resumable_media/requests/download.py index 6ebd31ec..2933ff66 100644 --- a/google/resumable_media/requests/download.py +++ b/google/resumable_media/requests/download.py @@ -119,7 +119,11 @@ def _write_to_stream(self, response): ) raise common.DataCorruption(response, msg) - def consume(self, transport): + def consume( + self, + transport, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Consume the resource to be downloaded. If a ``stream`` is attached to this download, then the downloaded @@ -128,6 +132,13 @@ def consume(self, transport): Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -144,6 +155,7 @@ def consume(self, transport): u"data": payload, u"headers": headers, u"retry_strategy": self._retry_strategy, + u"timeout": timeout, } if self._stream is not None: request_kwargs[u"stream"] = True @@ -231,7 +243,11 @@ def _write_to_stream(self, response): ) raise common.DataCorruption(response, msg) - def consume(self, transport): + def consume( + self, + transport, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Consume the resource to be downloaded. If a ``stream`` is attached to this download, then the downloaded @@ -240,6 +256,13 @@ def consume(self, transport): Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -260,6 +283,7 @@ def consume(self, transport): headers=headers, retry_strategy=self._retry_strategy, stream=True, + timeout=timeout, ) self._process_response(result) @@ -298,12 +322,23 @@ class ChunkedDownload(_helpers.RequestsMixin, _download.ChunkedDownload): ValueError: If ``start`` is negative. """ - def consume_next_chunk(self, transport): + def consume_next_chunk( + self, + transport, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Consume the next chunk of the resource to be downloaded. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -320,6 +355,7 @@ def consume_next_chunk(self, transport): data=payload, headers=headers, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_response(result) return result @@ -353,12 +389,23 @@ class RawChunkedDownload(_helpers.RawRequestsMixin, _download.ChunkedDownload): ValueError: If ``start`` is negative. """ - def consume_next_chunk(self, transport): + def consume_next_chunk( + self, + transport, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Consume the next chunk of the resource to be downloaded. Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -376,6 +423,7 @@ def consume_next_chunk(self, transport): headers=headers, stream=True, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_response(result) return result diff --git a/google/resumable_media/requests/upload.py b/google/resumable_media/requests/upload.py index f71ecf2a..c9d5ef43 100644 --- a/google/resumable_media/requests/upload.py +++ b/google/resumable_media/requests/upload.py @@ -38,7 +38,13 @@ class SimpleUpload(_helpers.RequestsMixin, _upload.SimpleUpload): upload_url (str): The URL where the content will be uploaded. """ - def transmit(self, transport, data, content_type): + def transmit( + self, + transport, + data, + content_type, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Transmit the resource to be uploaded. Args: @@ -47,6 +53,13 @@ def transmit(self, transport, data, content_type): data (bytes): The resource content to be uploaded. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -59,6 +72,7 @@ def transmit(self, transport, data, content_type): data=payload, headers=headers, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_response(response) return response @@ -79,7 +93,14 @@ class MultipartUpload(_helpers.RequestsMixin, _upload.MultipartUpload): upload_url (str): The URL where the content will be uploaded. """ - def transmit(self, transport, data, metadata, content_type): + def transmit( + self, + transport, + data, + metadata, + content_type, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Transmit the resource to be uploaded. Args: @@ -90,6 +111,13 @@ def transmit(self, transport, data, metadata, content_type): ACL list. content_type (str): The content type of the resource, e.g. a JPEG image has content type ``image/jpeg``. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -104,6 +132,7 @@ def transmit(self, transport, data, metadata, content_type): data=payload, headers=headers, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_response(response) return response @@ -300,6 +329,7 @@ def initiate( content_type, total_bytes=None, stream_final=True, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), ): """Initiate a resumable upload. @@ -331,6 +361,13 @@ def initiate( "final" (i.e. no more bytes will be added to it). In this case we determine the upload size from the size of the stream. If ``total_bytes`` is passed, this argument will be ignored. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -349,11 +386,16 @@ def initiate( data=payload, headers=headers, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_initiate_response(response) return response - def transmit_next_chunk(self, transport): + def transmit_next_chunk( + self, + transport, + timeout=(_helpers._DEFAULT_CONNECT_TIMEOUT, _helpers._DEFAULT_READ_TIMEOUT), + ): """Transmit the next chunk of the resource to be uploaded. If the current upload was initiated with ``stream_final=False``, @@ -407,6 +449,13 @@ def transmit_next_chunk(self, transport): Args: transport (~requests.Session): A ``requests`` object which can make authenticated requests. + timeout (Optional[Union[float, Tuple[float, float]]]): + The number of seconds to wait for the server response. + Depending on the retry strategy, a request may be repeated + several times using the same timeout each time. + + Can also be passed as a tuple (connect_timeout, read_timeout). + See :meth:`requests.Session.request` documentation for details. Returns: ~requests.Response: The HTTP response returned by ``transport``. @@ -423,6 +472,7 @@ def transmit_next_chunk(self, transport): data=payload, headers=headers, retry_strategy=self._retry_strategy, + timeout=timeout, ) self._process_response(response, len(payload)) return response diff --git a/tests/unit/requests/test_download.py b/tests/unit/requests/test_download.py index 971129b8..93449b55 100644 --- a/tests/unit/requests/test_download.py +++ b/tests/unit/requests/test_download.py @@ -108,7 +108,13 @@ def test__write_to_stream_with_hash_check_fail(self): ) def _consume_helper( - self, stream=None, end=65536, headers=None, chunks=(), response_headers=None + self, + stream=None, + end=65536, + headers=None, + chunks=(), + response_headers=None, + timeout=None, ): download = download_mod.Download( EXAMPLE_URL, stream=stream, end=end, headers=headers @@ -119,16 +125,24 @@ def _consume_helper( ) assert not download.finished - ret_val = download.consume(transport) + + if timeout is not None: + ret_val = download.consume(transport, timeout=timeout) + else: + ret_val = download.consume(transport) + assert ret_val is transport.request.return_value - called_kwargs = {u"data": None, u"headers": download._headers} + called_kwargs = { + u"data": None, + u"headers": download._headers, + u"timeout": EXPECTED_TIMEOUT if timeout is None else timeout, + } if chunks: assert stream is not None called_kwargs[u"stream"] = True - transport.request.assert_called_once_with( - u"GET", EXAMPLE_URL, timeout=EXPECTED_TIMEOUT, **called_kwargs - ) + + transport.request.assert_called_once_with(u"GET", EXAMPLE_URL, **called_kwargs) range_bytes = u"bytes={:d}-{:d}".format(0, end) assert download._headers[u"range"] == range_bytes @@ -139,6 +153,9 @@ def _consume_helper( def test_consume(self): self._consume_helper() + def test_consume_with_custom_timeout(self): + self._consume_helper(timeout=14.7) + def test_consume_with_stream(self): stream = io.BytesIO() chunks = (b"up down ", b"charlie ", b"brown") @@ -298,7 +315,13 @@ def test__write_to_stream_with_hash_check_fail(self): ) def _consume_helper( - self, stream=None, end=65536, headers=None, chunks=(), response_headers=None + self, + stream=None, + end=65536, + headers=None, + chunks=(), + response_headers=None, + timeout=None, ): download = download_mod.RawDownload( EXAMPLE_URL, stream=stream, end=end, headers=headers @@ -309,7 +332,12 @@ def _consume_helper( ) assert not download.finished - ret_val = download.consume(transport) + + if timeout is not None: + ret_val = download.consume(transport, timeout=timeout) + else: + ret_val = download.consume(transport) + assert ret_val is transport.request.return_value if chunks: @@ -320,7 +348,7 @@ def _consume_helper( data=None, headers=download._headers, stream=True, - timeout=EXPECTED_TIMEOUT, + timeout=EXPECTED_TIMEOUT if timeout is None else timeout, ) range_bytes = u"bytes={:d}-{:d}".format(0, end) @@ -332,6 +360,9 @@ def _consume_helper( def test_consume(self): self._consume_helper() + def test_consume_with_custom_timeout(self): + self._consume_helper(timeout=14.7) + def test_consume_with_stream(self): stream = io.BytesIO() chunks = (b"up down ", b"charlie ", b"brown") @@ -491,6 +522,26 @@ def test_consume_next_chunk(self): assert download.bytes_downloaded == chunk_size assert download.total_bytes == total_bytes + def test_consume_next_chunk_with_custom_timeout(self): + start = 1536 + stream = io.BytesIO() + data = b"Just one chunk." + chunk_size = len(data) + download = download_mod.ChunkedDownload( + EXAMPLE_URL, chunk_size, stream, start=start + ) + total_bytes = 16384 + transport = self._mock_transport(start, chunk_size, total_bytes, content=data) + + # Actually consume the chunk and check the output. + download.consume_next_chunk(transport, timeout=14.7) + + range_bytes = u"bytes={:d}-{:d}".format(start, start + chunk_size - 1) + download_headers = {u"range": range_bytes} + transport.request.assert_called_once_with( + u"GET", EXAMPLE_URL, data=None, headers=download_headers, timeout=14.7, + ) + class TestRawChunkedDownload(object): @staticmethod @@ -569,6 +620,36 @@ def test_consume_next_chunk(self): assert download.bytes_downloaded == chunk_size assert download.total_bytes == total_bytes + def test_consume_next_chunk_with_custom_timeout(self): + start = 1536 + stream = io.BytesIO() + data = b"Just one chunk." + chunk_size = len(data) + download = download_mod.RawChunkedDownload( + EXAMPLE_URL, chunk_size, stream, start=start + ) + total_bytes = 16384 + transport = self._mock_transport(start, chunk_size, total_bytes, content=data) + + # Actually consume the chunk and check the output. + download.consume_next_chunk(transport, timeout=14.7) + + range_bytes = u"bytes={:d}-{:d}".format(start, start + chunk_size - 1) + download_headers = {u"range": range_bytes} + transport.request.assert_called_once_with( + u"GET", + EXAMPLE_URL, + data=None, + headers=download_headers, + stream=True, + timeout=14.7, + ) + assert stream.getvalue() == data + # Go back and check the internal state after consuming the chunk. + assert not download.finished + assert download.bytes_downloaded == chunk_size + assert download.total_bytes == total_bytes + class Test__get_expected_md5(object): @mock.patch("google.resumable_media.requests.download._LOGGER") diff --git a/tests/unit/requests/test_upload.py b/tests/unit/requests/test_upload.py index e0628651..56a82461 100644 --- a/tests/unit/requests/test_upload.py +++ b/tests/unit/requests/test_upload.py @@ -62,6 +62,20 @@ def test_transmit(self): ) assert upload.finished + def test_transmit_w_custom_timeout(self): + data = b"I have got a lovely bunch of coconuts." + content_type = BASIC_CONTENT + upload = upload_mod.SimpleUpload(SIMPLE_URL) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _make_response() + + upload.transmit(transport, data, content_type, timeout=12.6) + + expected_headers = {u"content-type": content_type} + transport.request.assert_called_once_with( + u"POST", SIMPLE_URL, data=data, headers=expected_headers, timeout=12.6, + ) + class TestMultipartUpload(object): @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==4==") @@ -100,6 +114,44 @@ def test_transmit(self, mock_get_boundary): assert upload.finished mock_get_boundary.assert_called_once_with() + @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==4==") + def test_transmit_w_custom_timeout(self, mock_get_boundary): + data = b"Mock data here and there." + metadata = {u"Hey": u"You", u"Guys": u"90909"} + content_type = BASIC_CONTENT + upload = upload_mod.MultipartUpload(MULTIPART_URL) + transport = mock.Mock(spec=["request"]) + transport.request.return_value = _make_response() + + upload.transmit(transport, data, metadata, content_type, timeout=12.6) + + expected_payload = b"".join( + ( + b"--==4==\r\n", + JSON_TYPE_LINE, + b"\r\n", + json.dumps(metadata).encode(u"utf-8"), + b"\r\n", + b"--==4==\r\n", + b"content-type: text/plain\r\n", + b"\r\n", + b"Mock data here and there.\r\n", + b"--==4==--", + ) + ) + multipart_type = b'multipart/related; boundary="==4=="' + upload_headers = {u"content-type": multipart_type} + + transport.request.assert_called_once_with( + u"POST", + MULTIPART_URL, + data=expected_payload, + headers=upload_headers, + timeout=12.6, + ) + assert upload.finished + mock_get_boundary.assert_called_once_with() + class TestResumableUpload(object): def test_initiate(self): @@ -144,6 +196,37 @@ def test_initiate(self): timeout=EXPECTED_TIMEOUT, ) + def test_initiate_w_custom_timeout(self): + upload = upload_mod.ResumableUpload(RESUMABLE_URL, ONE_MB) + data = b"Knock knock who is there" + stream = io.BytesIO(data) + metadata = {u"name": u"got-jokes.txt"} + + transport = mock.Mock(spec=["request"]) + location = (u"http://test.invalid?upload_id=AACODBBBxuw9u3AA",) + response_headers = {u"location": location} + post_response = _make_response(headers=response_headers) + transport.request.return_value = post_response + + upload.initiate( + transport, stream, metadata, BASIC_CONTENT, total_bytes=100, timeout=12.6, + ) + + # Make sure timeout was passed to the transport + json_bytes = b'{"name": "got-jokes.txt"}' + expected_headers = { + u"content-type": JSON_TYPE, + u"x-upload-content-type": BASIC_CONTENT, + u"x-upload-content-length": u"{:d}".format(100), + } + transport.request.assert_called_once_with( + u"POST", + RESUMABLE_URL, + data=json_bytes, + headers=expected_headers, + timeout=12.6, + ) + @staticmethod def _upload_in_flight(data, headers=None): upload = upload_mod.ResumableUpload(RESUMABLE_URL, ONE_MB, headers=headers) @@ -196,6 +279,38 @@ def test_transmit_next_chunk(self): timeout=EXPECTED_TIMEOUT, ) + def test_transmit_next_chunk_w_custom_timeout(self): + data = b"This time the data is official." + upload = self._upload_in_flight(data) + + # Make a fake chunk size smaller than 256 KB. + chunk_size = 10 + upload._chunk_size = chunk_size + + # Make a fake 308 response. + response_headers = {u"range": u"bytes=0-{:d}".format(chunk_size - 1)} + transport = self._chunk_mock( + resumable_media.PERMANENT_REDIRECT, response_headers + ) + + # Make request and check the return value (against the mock). + upload.transmit_next_chunk(transport, timeout=12.6) + + # Make sure timeout was passed to the transport + payload = data[:chunk_size] + content_range = u"bytes 0-{:d}/{:d}".format(chunk_size - 1, len(data)) + expected_headers = { + u"content-range": content_range, + u"content-type": BASIC_CONTENT, + } + transport.request.assert_called_once_with( + u"PUT", + upload.resumable_url, + data=payload, + headers=expected_headers, + timeout=12.6, + ) + def test_recover(self): upload = upload_mod.ResumableUpload(RESUMABLE_URL, ONE_MB) upload._invalid = True # Make sure invalid.