From b0dd892176e31ac25fddd15554b5bfa054299d4d Mon Sep 17 00:00:00 2001 From: HemangChothani <50404902+HemangChothani@users.noreply.github.com> Date: Mon, 19 Oct 2020 09:45:12 -0400 Subject: [PATCH] feat: add timeout paramter to load_table_from_file and it dependent methods (#327) --- google/cloud/bigquery/client.py | 71 +++++++++++++++++++++++++++------ tests/unit/test_client.py | 45 +++++++++++++++------ 2 files changed, 92 insertions(+), 24 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index b7e082daa..cce393d6c 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1591,7 +1591,7 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) - def create_job(self, job_config, retry=DEFAULT_RETRY): + def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None): """Create a new job. Args: job_config (dict): configuration job representation returned from the API. @@ -1599,6 +1599,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): Keyword Arguments: retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1617,7 +1620,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): destination = _get_sub_prop(job_config, ["load", "destinationTable"]) source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) return self.load_table_from_uri( - source_uris, destination, job_config=load_job_config, retry=retry + source_uris, + destination, + job_config=load_job_config, + retry=retry, + timeout=timeout, ) elif "copy" in job_config: copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( @@ -1633,7 +1640,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) return self.copy_table( - sources, destination, job_config=copy_job_config, retry=retry + sources, + destination, + job_config=copy_job_config, + retry=retry, + timeout=timeout, ) elif "extract" in job_config: extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( @@ -1650,6 +1661,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): destination_uris, job_config=extract_job_config, retry=retry, + timeout=timeout, source_type=source_type, ) elif "query" in job_config: @@ -1659,7 +1671,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): copy_config ) query = _get_sub_prop(copy_config, ["query", "query"]) - return self.query(query, job_config=query_job_config, retry=retry) + return self.query( + query, job_config=query_job_config, retry=retry, timeout=timeout + ) else: raise TypeError("Invalid job configuration received.") @@ -1981,6 +1995,7 @@ def load_table_from_file( location=None, project=None, job_config=None, + timeout=None, ): """Upload the contents of this table from a file-like object. @@ -2020,6 +2035,9 @@ def load_table_from_file( to the client's project. job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): Extra configuration options for the job. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2058,11 +2076,11 @@ def load_table_from_file( try: if size is None or size >= _MAX_MULTIPART_SIZE: response = self._do_resumable_upload( - file_obj, job_resource, num_retries + file_obj, job_resource, num_retries, timeout ) else: response = self._do_multipart_upload( - file_obj, job_resource, size, num_retries + file_obj, job_resource, size, num_retries, timeout ) except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) @@ -2080,6 +2098,7 @@ def load_table_from_dataframe( project=None, job_config=None, parquet_compression="snappy", + timeout=None, ): """Upload the contents of a table from a pandas DataFrame. @@ -2143,6 +2162,9 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2249,6 +2271,7 @@ def load_table_from_dataframe( location=location, project=project, job_config=job_config, + timeout=timeout, ) finally: @@ -2264,6 +2287,7 @@ def load_table_from_json( location=None, project=None, job_config=None, + timeout=None, ): """Upload the contents of a table from a JSON string or dict. @@ -2313,6 +2337,9 @@ def load_table_from_json( Extra configuration options for the job. The ``source_format`` setting is always set to :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2357,9 +2384,10 @@ def load_table_from_json( location=location, project=project, job_config=job_config, + timeout=timeout, ) - def _do_resumable_upload(self, stream, metadata, num_retries): + def _do_resumable_upload(self, stream, metadata, num_retries, timeout): """Perform a resumable upload. Args: @@ -2371,13 +2399,17 @@ def _do_resumable_upload(self, stream, metadata, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: requests.Response: The "200 OK" response object returned after the final chunk is uploaded. """ upload, transport = self._initiate_resumable_upload( - stream, metadata, num_retries + stream, metadata, num_retries, timeout ) while not upload.finished: @@ -2385,7 +2417,7 @@ def _do_resumable_upload(self, stream, metadata, num_retries): return response - def _initiate_resumable_upload(self, stream, metadata, num_retries): + def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): """Initiate a resumable upload. Args: @@ -2397,6 +2429,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: Tuple: Pair of @@ -2419,12 +2455,17 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries): ) upload.initiate( - transport, stream, metadata, _GENERIC_CONTENT_TYPE, stream_final=False + transport, + stream, + metadata, + _GENERIC_CONTENT_TYPE, + stream_final=False, + timeout=timeout, ) return upload, transport - def _do_multipart_upload(self, stream, metadata, size, num_retries): + def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout): """Perform a multipart upload. Args: @@ -2441,6 +2482,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: requests.Response: The "200 OK" response object returned after the multipart @@ -2466,7 +2511,9 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries): max_retries=num_retries ) - response = upload.transmit(self._http, data, metadata, _GENERIC_CONTENT_TYPE) + response = upload.transmit( + self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout + ) return response diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index bc2658961..2001ad42b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4425,7 +4425,7 @@ def _initiate_resumable_upload_helper(self, num_retries=None): job = LoadJob(None, None, self.TABLE_REF, client, job_config=config) metadata = job.to_api_repr() upload, transport = client._initiate_resumable_upload( - stream, metadata, num_retries + stream, metadata, num_retries, None ) # Check the returned values. @@ -4492,7 +4492,9 @@ def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): job = LoadJob(None, None, self.TABLE_REF, client, job_config=config) metadata = job.to_api_repr() size = len(data) - response = client._do_multipart_upload(stream, metadata, size, num_retries) + response = client._do_multipart_upload( + stream, metadata, size, num_retries, None + ) # Check the mocks and the returned value. self.assertIs(response, fake_transport.request.return_value) @@ -7251,7 +7253,7 @@ def test_load_table_from_file_resumable(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES + file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None ) # the original config object should not have been modified @@ -7280,7 +7282,7 @@ def test_load_table_from_file_w_explicit_project(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES + file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_w_client_location(self): @@ -7310,7 +7312,7 @@ def test_load_table_from_file_w_client_location(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES + file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_resumable_metadata(self): @@ -7368,7 +7370,7 @@ def test_load_table_from_file_resumable_metadata(self): ) do_upload.assert_called_once_with( - file_obj, expected_config, _DEFAULT_NUM_RETRIES + file_obj, expected_config, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_multipart(self): @@ -7392,7 +7394,11 @@ def test_load_table_from_file_multipart(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, file_obj_size, _DEFAULT_NUM_RETRIES + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + _DEFAULT_NUM_RETRIES, + None, ) def test_load_table_from_file_with_retries(self): @@ -7413,7 +7419,7 @@ def test_load_table_from_file_with_retries(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, num_retries + file_obj, self.EXPECTED_CONFIGURATION, num_retries, None ) def test_load_table_from_file_with_rewind(self): @@ -7446,7 +7452,7 @@ def test_load_table_from_file_with_readable_gzip(self): ) do_upload.assert_called_once_with( - gzip_file, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES + gzip_file, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_with_writable_gzip(self): @@ -7539,6 +7545,7 @@ def test_load_table_from_dataframe(self): location=None, project=None, job_config=mock.ANY, + timeout=None, ) sent_file = load_table_from_file.mock_calls[0][1][1] @@ -7583,6 +7590,7 @@ def test_load_table_from_dataframe_w_client_location(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_file = load_table_from_file.mock_calls[0][1][1] @@ -7636,6 +7644,7 @@ def test_load_table_from_dataframe_w_custom_job_config_wihtout_source_format(sel location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7691,6 +7700,7 @@ def test_load_table_from_dataframe_w_custom_job_config_w_source_format(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7784,6 +7794,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7844,6 +7855,7 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7890,6 +7902,7 @@ def test_load_table_from_dataframe_unknown_table(self): location=None, project=None, job_config=mock.ANY, + timeout=None, ) @unittest.skipIf( @@ -7931,6 +7944,7 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7978,6 +7992,7 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8039,6 +8054,7 @@ def test_load_table_from_dataframe_struct_fields(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8113,6 +8129,7 @@ def test_load_table_from_dataframe_w_partial_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8207,6 +8224,7 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) assert warned # there should be at least one warning @@ -8320,6 +8338,7 @@ def test_load_table_from_dataframe_w_nulls(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8373,6 +8392,7 @@ def test_load_table_from_json_basic_use(self): location=client.location, project=client.project, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8425,6 +8445,7 @@ def test_load_table_from_json_non_default_args(self): location="EU", project="project-x", job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8499,7 +8520,7 @@ def test__do_resumable_upload(self): client = self._make_client(transport) result = client._do_resumable_upload( - file_obj, self.EXPECTED_CONFIGURATION, None + file_obj, self.EXPECTED_CONFIGURATION, None, None ) content = result.content.decode("utf-8") @@ -8522,7 +8543,7 @@ def test__do_multipart_upload(self): file_obj_len = len(file_obj.getvalue()) client._do_multipart_upload( - file_obj, self.EXPECTED_CONFIGURATION, file_obj_len, None + file_obj, self.EXPECTED_CONFIGURATION, file_obj_len, None, None ) # Verify that configuration data was passed in with the initial @@ -8550,7 +8571,7 @@ def test__do_multipart_upload_wrong_size(self): file_obj_len = len(file_obj.getvalue()) with pytest.raises(ValueError): - client._do_multipart_upload(file_obj, {}, file_obj_len + 1, None) + client._do_multipart_upload(file_obj, {}, file_obj_len + 1, None, None) def test_schema_from_json_with_file_path(self): from google.cloud.bigquery.schema import SchemaField