From 2f2ade7c98395f7d3fac90e44e143c8c27450823 Mon Sep 17 00:00:00 2001 From: HemangChothani Date: Thu, 15 Oct 2020 16:15:59 +0530 Subject: [PATCH] feat: add timeout paramter to load_table_from_file and it dependent methods --- google/cloud/bigquery/client.py | 71 +++++++++++++++++++++++++++------ tests/unit/test_client.py | 45 +++++++++++++++------ 2 files changed, 92 insertions(+), 24 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index b7e082daa..cce393d6c 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -1591,7 +1591,7 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) - def create_job(self, job_config, retry=DEFAULT_RETRY): + def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None): """Create a new job. Args: job_config (dict): configuration job representation returned from the API. @@ -1599,6 +1599,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): Keyword Arguments: retry (Optional[google.api_core.retry.Retry]): How to retry the RPC. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: Union[ \ @@ -1617,7 +1620,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): destination = _get_sub_prop(job_config, ["load", "destinationTable"]) source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) return self.load_table_from_uri( - source_uris, destination, job_config=load_job_config, retry=retry + source_uris, + destination, + job_config=load_job_config, + retry=retry, + timeout=timeout, ) elif "copy" in job_config: copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( @@ -1633,7 +1640,11 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): table_ref = TableReference.from_api_repr(source_config) sources.append(table_ref) return self.copy_table( - sources, destination, job_config=copy_job_config, retry=retry + sources, + destination, + job_config=copy_job_config, + retry=retry, + timeout=timeout, ) elif "extract" in job_config: extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( @@ -1650,6 +1661,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): destination_uris, job_config=extract_job_config, retry=retry, + timeout=timeout, source_type=source_type, ) elif "query" in job_config: @@ -1659,7 +1671,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY): copy_config ) query = _get_sub_prop(copy_config, ["query", "query"]) - return self.query(query, job_config=query_job_config, retry=retry) + return self.query( + query, job_config=query_job_config, retry=retry, timeout=timeout + ) else: raise TypeError("Invalid job configuration received.") @@ -1981,6 +1995,7 @@ def load_table_from_file( location=None, project=None, job_config=None, + timeout=None, ): """Upload the contents of this table from a file-like object. @@ -2020,6 +2035,9 @@ def load_table_from_file( to the client's project. job_config (Optional[google.cloud.bigquery.job.LoadJobConfig]): Extra configuration options for the job. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2058,11 +2076,11 @@ def load_table_from_file( try: if size is None or size >= _MAX_MULTIPART_SIZE: response = self._do_resumable_upload( - file_obj, job_resource, num_retries + file_obj, job_resource, num_retries, timeout ) else: response = self._do_multipart_upload( - file_obj, job_resource, size, num_retries + file_obj, job_resource, size, num_retries, timeout ) except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) @@ -2080,6 +2098,7 @@ def load_table_from_dataframe( project=None, job_config=None, parquet_compression="snappy", + timeout=None, ): """Upload the contents of a table from a pandas DataFrame. @@ -2143,6 +2162,9 @@ def load_table_from_dataframe( passed as the ``compression`` argument to the underlying ``DataFrame.to_parquet()`` method. https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.DataFrame.to_parquet.html#pandas.DataFrame.to_parquet + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2249,6 +2271,7 @@ def load_table_from_dataframe( location=location, project=project, job_config=job_config, + timeout=timeout, ) finally: @@ -2264,6 +2287,7 @@ def load_table_from_json( location=None, project=None, job_config=None, + timeout=None, ): """Upload the contents of a table from a JSON string or dict. @@ -2313,6 +2337,9 @@ def load_table_from_json( Extra configuration options for the job. The ``source_format`` setting is always set to :attr:`~google.cloud.bigquery.job.SourceFormat.NEWLINE_DELIMITED_JSON`. + timeout (Optional[float]): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. Returns: google.cloud.bigquery.job.LoadJob: A new load job. @@ -2357,9 +2384,10 @@ def load_table_from_json( location=location, project=project, job_config=job_config, + timeout=timeout, ) - def _do_resumable_upload(self, stream, metadata, num_retries): + def _do_resumable_upload(self, stream, metadata, num_retries, timeout): """Perform a resumable upload. Args: @@ -2371,13 +2399,17 @@ def _do_resumable_upload(self, stream, metadata, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: requests.Response: The "200 OK" response object returned after the final chunk is uploaded. """ upload, transport = self._initiate_resumable_upload( - stream, metadata, num_retries + stream, metadata, num_retries, timeout ) while not upload.finished: @@ -2385,7 +2417,7 @@ def _do_resumable_upload(self, stream, metadata, num_retries): return response - def _initiate_resumable_upload(self, stream, metadata, num_retries): + def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): """Initiate a resumable upload. Args: @@ -2397,6 +2429,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: Tuple: Pair of @@ -2419,12 +2455,17 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries): ) upload.initiate( - transport, stream, metadata, _GENERIC_CONTENT_TYPE, stream_final=False + transport, + stream, + metadata, + _GENERIC_CONTENT_TYPE, + stream_final=False, + timeout=timeout, ) return upload, transport - def _do_multipart_upload(self, stream, metadata, size, num_retries): + def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout): """Perform a multipart upload. Args: @@ -2441,6 +2482,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries): Number of upload retries. (Deprecated: This argument will be removed in a future release.) + timeout (float): + The number of seconds to wait for the underlying HTTP transport + before using ``retry``. + Returns: requests.Response: The "200 OK" response object returned after the multipart @@ -2466,7 +2511,9 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries): max_retries=num_retries ) - response = upload.transmit(self._http, data, metadata, _GENERIC_CONTENT_TYPE) + response = upload.transmit( + self._http, data, metadata, _GENERIC_CONTENT_TYPE, timeout=timeout + ) return response diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index 52e00d7c7..c99de2f26 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4425,7 +4425,7 @@ def _initiate_resumable_upload_helper(self, num_retries=None): job = LoadJob(None, None, self.TABLE_REF, client, job_config=config) metadata = job.to_api_repr() upload, transport = client._initiate_resumable_upload( - stream, metadata, num_retries + stream, metadata, num_retries, None ) # Check the returned values. @@ -4492,7 +4492,9 @@ def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): job = LoadJob(None, None, self.TABLE_REF, client, job_config=config) metadata = job.to_api_repr() size = len(data) - response = client._do_multipart_upload(stream, metadata, size, num_retries) + response = client._do_multipart_upload( + stream, metadata, size, num_retries, None + ) # Check the mocks and the returned value. self.assertIs(response, fake_transport.request.return_value) @@ -7200,7 +7202,7 @@ def test_load_table_from_file_resumable(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES + file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None ) # the original config object should not have been modified @@ -7229,7 +7231,7 @@ def test_load_table_from_file_w_explicit_project(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES + file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_w_client_location(self): @@ -7259,7 +7261,7 @@ def test_load_table_from_file_w_client_location(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES + file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_resumable_metadata(self): @@ -7317,7 +7319,7 @@ def test_load_table_from_file_resumable_metadata(self): ) do_upload.assert_called_once_with( - file_obj, expected_config, _DEFAULT_NUM_RETRIES + file_obj, expected_config, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_multipart(self): @@ -7341,7 +7343,11 @@ def test_load_table_from_file_multipart(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, file_obj_size, _DEFAULT_NUM_RETRIES + file_obj, + self.EXPECTED_CONFIGURATION, + file_obj_size, + _DEFAULT_NUM_RETRIES, + None, ) def test_load_table_from_file_with_retries(self): @@ -7362,7 +7368,7 @@ def test_load_table_from_file_with_retries(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, num_retries + file_obj, self.EXPECTED_CONFIGURATION, num_retries, None ) def test_load_table_from_file_with_rewind(self): @@ -7395,7 +7401,7 @@ def test_load_table_from_file_with_readable_gzip(self): ) do_upload.assert_called_once_with( - gzip_file, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES + gzip_file, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None ) def test_load_table_from_file_with_writable_gzip(self): @@ -7488,6 +7494,7 @@ def test_load_table_from_dataframe(self): location=None, project=None, job_config=mock.ANY, + timeout=None, ) sent_file = load_table_from_file.mock_calls[0][1][1] @@ -7532,6 +7539,7 @@ def test_load_table_from_dataframe_w_client_location(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_file = load_table_from_file.mock_calls[0][1][1] @@ -7585,6 +7593,7 @@ def test_load_table_from_dataframe_w_custom_job_config_wihtout_source_format(sel location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7640,6 +7649,7 @@ def test_load_table_from_dataframe_w_custom_job_config_w_source_format(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7733,6 +7743,7 @@ def test_load_table_from_dataframe_w_automatic_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7793,6 +7804,7 @@ def test_load_table_from_dataframe_w_index_and_auto_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7839,6 +7851,7 @@ def test_load_table_from_dataframe_unknown_table(self): location=None, project=None, job_config=mock.ANY, + timeout=None, ) @unittest.skipIf( @@ -7880,6 +7893,7 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7927,6 +7941,7 @@ def test_load_table_from_dataframe_w_nullable_int64_datatype_automatic_schema(se location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -7988,6 +8003,7 @@ def test_load_table_from_dataframe_struct_fields(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8062,6 +8078,7 @@ def test_load_table_from_dataframe_w_partial_schema(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8156,6 +8173,7 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) assert warned # there should be at least one warning @@ -8269,6 +8287,7 @@ def test_load_table_from_dataframe_w_nulls(self): location=self.LOCATION, project=None, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8322,6 +8341,7 @@ def test_load_table_from_json_basic_use(self): location=client.location, project=client.project, job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8374,6 +8394,7 @@ def test_load_table_from_json_non_default_args(self): location="EU", project="project-x", job_config=mock.ANY, + timeout=None, ) sent_config = load_table_from_file.mock_calls[0][2]["job_config"] @@ -8448,7 +8469,7 @@ def test__do_resumable_upload(self): client = self._make_client(transport) result = client._do_resumable_upload( - file_obj, self.EXPECTED_CONFIGURATION, None + file_obj, self.EXPECTED_CONFIGURATION, None, None ) content = result.content.decode("utf-8") @@ -8471,7 +8492,7 @@ def test__do_multipart_upload(self): file_obj_len = len(file_obj.getvalue()) client._do_multipart_upload( - file_obj, self.EXPECTED_CONFIGURATION, file_obj_len, None + file_obj, self.EXPECTED_CONFIGURATION, file_obj_len, None, None ) # Verify that configuration data was passed in with the initial @@ -8499,7 +8520,7 @@ def test__do_multipart_upload_wrong_size(self): file_obj_len = len(file_obj.getvalue()) with pytest.raises(ValueError): - client._do_multipart_upload(file_obj, {}, file_obj_len + 1, None) + client._do_multipart_upload(file_obj, {}, file_obj_len + 1, None, None) def test_schema_from_json_with_file_path(self): from google.cloud.bigquery.schema import SchemaField