From 530e1e8d8fe8939e914a78ff1b220907c1b87af7 Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Fri, 22 Jan 2021 23:35:12 +0100 Subject: [PATCH] fix: use explicitly given project over the client's default project for load jobs (#482) * fix: use project parameter if given for load jobs * blacken client tests * Refactor string concatenations in client tests * Silence invalid coverage complaint --- google/cloud/bigquery/client.py | 41 +++++++-- tests/unit/test_client.py | 157 +++++++++++++++++++++++--------- 2 files changed, 148 insertions(+), 50 deletions(-) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 3541726b8..b270075a9 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -2136,11 +2136,11 @@ def load_table_from_file( try: if size is None or size >= _MAX_MULTIPART_SIZE: response = self._do_resumable_upload( - file_obj, job_resource, num_retries, timeout + file_obj, job_resource, num_retries, timeout, project=project ) else: response = self._do_multipart_upload( - file_obj, job_resource, size, num_retries, timeout + file_obj, job_resource, size, num_retries, timeout, project=project ) except resumable_media.InvalidResponse as exc: raise exceptions.from_http_response(exc.response) @@ -2475,7 +2475,9 @@ def load_table_from_json( timeout=timeout, ) - def _do_resumable_upload(self, stream, metadata, num_retries, timeout): + def _do_resumable_upload( + self, stream, metadata, num_retries, timeout, project=None + ): """Perform a resumable upload. Args: @@ -2491,13 +2493,17 @@ def _do_resumable_upload(self, stream, metadata, num_retries, timeout): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + project (Optional[str]): + Project ID of the project of where to run the upload. Defaults + to the client's project. + Returns: requests.Response: The "200 OK" response object returned after the final chunk is uploaded. """ upload, transport = self._initiate_resumable_upload( - stream, metadata, num_retries, timeout + stream, metadata, num_retries, timeout, project=project ) while not upload.finished: @@ -2505,7 +2511,9 @@ def _do_resumable_upload(self, stream, metadata, num_retries, timeout): return response - def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): + def _initiate_resumable_upload( + self, stream, metadata, num_retries, timeout, project=None + ): """Initiate a resumable upload. Args: @@ -2521,6 +2529,10 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + project (Optional[str]): + Project ID of the project of where to run the upload. Defaults + to the client's project. + Returns: Tuple: Pair of @@ -2532,7 +2544,11 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): chunk_size = _DEFAULT_CHUNKSIZE transport = self._http headers = _get_upload_headers(self._connection.user_agent) - upload_url = _RESUMABLE_URL_TEMPLATE.format(project=self.project) + + if project is None: + project = self.project + upload_url = _RESUMABLE_URL_TEMPLATE.format(project=project) + # TODO: modify ResumableUpload to take a retry.Retry object # that it can use for the initial RPC. upload = ResumableUpload(upload_url, chunk_size, headers=headers) @@ -2553,7 +2569,9 @@ def _initiate_resumable_upload(self, stream, metadata, num_retries, timeout): return upload, transport - def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout): + def _do_multipart_upload( + self, stream, metadata, size, num_retries, timeout, project=None + ): """Perform a multipart upload. Args: @@ -2574,6 +2592,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout): The number of seconds to wait for the underlying HTTP transport before using ``retry``. + project (Optional[str]): + Project ID of the project of where to run the upload. Defaults + to the client's project. + Returns: requests.Response: The "200 OK" response object returned after the multipart @@ -2591,7 +2613,10 @@ def _do_multipart_upload(self, stream, metadata, size, num_retries, timeout): headers = _get_upload_headers(self._connection.user_agent) - upload_url = _MULTIPART_URL_TEMPLATE.format(project=self.project) + if project is None: + project = self.project + + upload_url = _MULTIPART_URL_TEMPLATE.format(project=project) upload = MultipartUpload(upload_url, headers=headers) if num_retries is not None: diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index bf183b5a4..625256e6e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -4455,9 +4455,8 @@ def _initiate_resumable_upload_helper(self, num_retries=None): # Check the returned values. self.assertIsInstance(upload, ResumableUpload) upload_url = ( - "https://bigquery.googleapis.com/upload/bigquery/v2/projects/" - + self.PROJECT - + "/jobs?uploadType=resumable" + f"https://bigquery.googleapis.com/upload/bigquery/v2/projects/{self.PROJECT}" + "/jobs?uploadType=resumable" ) self.assertEqual(upload.upload_url, upload_url) expected_headers = _get_upload_headers(conn.user_agent) @@ -4498,7 +4497,9 @@ def test__initiate_resumable_upload(self): def test__initiate_resumable_upload_with_retry(self): self._initiate_resumable_upload_helper(num_retries=11) - def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): + def _do_multipart_upload_success_helper( + self, get_boundary, num_retries=None, project=None + ): from google.cloud.bigquery.client import _get_upload_headers from google.cloud.bigquery.job import LoadJob from google.cloud.bigquery.job import LoadJobConfig @@ -4508,6 +4509,9 @@ def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): client = self._make_one(project=self.PROJECT, _http=fake_transport) conn = client._connection = make_connection() + if project is None: + project = self.PROJECT + # Create some mock arguments. data = b"Bzzzz-zap \x00\x01\xf4" stream = io.BytesIO(data) @@ -4516,8 +4520,9 @@ def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): job = LoadJob(None, None, self.TABLE_REF, client, job_config=config) metadata = job.to_api_repr() size = len(data) + response = client._do_multipart_upload( - stream, metadata, size, num_retries, None + stream, metadata, size, num_retries, None, project=project ) # Check the mocks and the returned value. @@ -4526,35 +4531,39 @@ def _do_multipart_upload_success_helper(self, get_boundary, num_retries=None): get_boundary.assert_called_once_with() upload_url = ( - "https://bigquery.googleapis.com/upload/bigquery/v2/projects/" - + self.PROJECT - + "/jobs?uploadType=multipart" + f"https://bigquery.googleapis.com/upload/bigquery/v2/projects/{project}" + "/jobs?uploadType=multipart" ) payload = ( b"--==0==\r\n" - + b"content-type: application/json; charset=UTF-8\r\n\r\n" - + json.dumps(metadata).encode("utf-8") - + b"\r\n" - + b"--==0==\r\n" - + b"content-type: */*\r\n\r\n" - + data - + b"\r\n" - + b"--==0==--" - ) + b"content-type: application/json; charset=UTF-8\r\n\r\n" + b"%(json_metadata)s" + b"\r\n" + b"--==0==\r\n" + b"content-type: */*\r\n\r\n" + b"%(data)s" + b"\r\n" + b"--==0==--" + ) % {b"json_metadata": json.dumps(metadata).encode("utf-8"), b"data": data} + headers = _get_upload_headers(conn.user_agent) headers["content-type"] = b'multipart/related; boundary="==0=="' fake_transport.request.assert_called_once_with( "POST", upload_url, data=payload, headers=headers, timeout=mock.ANY ) - @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + @mock.patch("google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload(self, get_boundary): self._do_multipart_upload_success_helper(get_boundary) - @mock.patch(u"google.resumable_media._upload.get_boundary", return_value=b"==0==") + @mock.patch("google.resumable_media._upload.get_boundary", return_value=b"==0==") def test__do_multipart_upload_with_retry(self, get_boundary): self._do_multipart_upload_success_helper(get_boundary, num_retries=8) + @mock.patch("google.resumable_media._upload.get_boundary", return_value=b"==0==") + def test__do_multipart_upload_with_custom_project(self, get_boundary): + self._do_multipart_upload_success_helper(get_boundary, project="custom-project") + def test_copy_table(self): from google.cloud.bigquery.job import CopyJob @@ -6364,10 +6373,10 @@ def test_insert_rows_from_dataframe(self): dataframe = pandas.DataFrame( [ - {"name": u"Little One", "age": 10, "adult": False}, - {"name": u"Young Gun", "age": 20, "adult": True}, - {"name": u"Dad", "age": 30, "adult": True}, - {"name": u"Stranger", "age": 40, "adult": True}, + {"name": "Little One", "age": 10, "adult": False}, + {"name": "Young Gun", "age": 20, "adult": True}, + {"name": "Dad", "age": 30, "adult": True}, + {"name": "Stranger", "age": 40, "adult": True}, ] ) @@ -6560,8 +6569,8 @@ def test_insert_rows_from_dataframe_w_explicit_none_insert_ids(self): dataframe = pandas.DataFrame( [ - {"name": u"Little One", "adult": False}, - {"name": u"Young Gun", "adult": True}, + {"name": "Little One", "adult": False}, + {"name": "Young Gun", "adult": True}, ] ) @@ -7230,17 +7239,18 @@ class TestClientUpload(object): # `pytest`-style tests rather than `unittest`-style. from google.cloud.bigquery.job import SourceFormat - TABLE_REF = DatasetReference("project_id", "test_dataset").table("test_table") + PROJECT = "project_id" + TABLE_REF = DatasetReference(PROJECT, "test_dataset").table("test_table") LOCATION = "us-central" - @staticmethod - def _make_client(transport=None, location=None): + @classmethod + def _make_client(cls, transport=None, location=None): from google.cloud.bigquery import _http from google.cloud.bigquery import client cl = client.Client( - project="project_id", + project=cls.PROJECT, credentials=_make_credentials(), _http=transport, location=location, @@ -7274,12 +7284,12 @@ def _make_do_upload_patch(cls, client, method, resource={}, side_effect=None): return mock.patch.object(client, method, side_effect=side_effect, autospec=True) EXPECTED_CONFIGURATION = { - "jobReference": {"projectId": "project_id", "jobId": "job_id"}, + "jobReference": {"projectId": PROJECT, "jobId": "job_id"}, "configuration": { "load": { "sourceFormat": SourceFormat.CSV, "destinationTable": { - "projectId": "project_id", + "projectId": PROJECT, "datasetId": "test_dataset", "tableId": "test_table", }, @@ -7325,7 +7335,11 @@ def test_load_table_from_file_resumable(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None + file_obj, + self.EXPECTED_CONFIGURATION, + _DEFAULT_NUM_RETRIES, + None, + project=self.EXPECTED_CONFIGURATION["jobReference"]["projectId"], ) # the original config object should not have been modified @@ -7354,7 +7368,11 @@ def test_load_table_from_file_w_explicit_project(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + None, + project="other-project", ) def test_load_table_from_file_w_client_location(self): @@ -7384,7 +7402,11 @@ def test_load_table_from_file_w_client_location(self): expected_resource["jobReference"]["location"] = self.LOCATION expected_resource["jobReference"]["projectId"] = "other-project" do_upload.assert_called_once_with( - file_obj, expected_resource, _DEFAULT_NUM_RETRIES, None + file_obj, + expected_resource, + _DEFAULT_NUM_RETRIES, + None, + project="other-project", ) def test_load_table_from_file_resumable_metadata(self): @@ -7409,7 +7431,7 @@ def test_load_table_from_file_resumable_metadata(self): config.null_marker = r"\N" expected_config = { - "jobReference": {"projectId": "project_id", "jobId": "job_id"}, + "jobReference": {"projectId": self.PROJECT, "jobId": "job_id"}, "configuration": { "load": { "destinationTable": { @@ -7442,7 +7464,11 @@ def test_load_table_from_file_resumable_metadata(self): ) do_upload.assert_called_once_with( - file_obj, expected_config, _DEFAULT_NUM_RETRIES, None + file_obj, + expected_config, + _DEFAULT_NUM_RETRIES, + None, + project=self.EXPECTED_CONFIGURATION["jobReference"]["projectId"], ) def test_load_table_from_file_multipart(self): @@ -7471,6 +7497,7 @@ def test_load_table_from_file_multipart(self): file_obj_size, _DEFAULT_NUM_RETRIES, None, + project=self.PROJECT, ) def test_load_table_from_file_with_retries(self): @@ -7491,7 +7518,11 @@ def test_load_table_from_file_with_retries(self): ) do_upload.assert_called_once_with( - file_obj, self.EXPECTED_CONFIGURATION, num_retries, None + file_obj, + self.EXPECTED_CONFIGURATION, + num_retries, + None, + project=self.EXPECTED_CONFIGURATION["jobReference"]["projectId"], ) def test_load_table_from_file_with_rewind(self): @@ -7524,7 +7555,11 @@ def test_load_table_from_file_with_readable_gzip(self): ) do_upload.assert_called_once_with( - gzip_file, self.EXPECTED_CONFIGURATION, _DEFAULT_NUM_RETRIES, None + gzip_file, + self.EXPECTED_CONFIGURATION, + _DEFAULT_NUM_RETRIES, + None, + project=self.EXPECTED_CONFIGURATION["jobReference"]["projectId"], ) def test_load_table_from_file_with_writable_gzip(self): @@ -8169,7 +8204,7 @@ def test_load_table_from_dataframe_w_partial_schema(self): dtype="datetime64[ns]", ).dt.tz_localize(pytz.utc), ), - ("string_col", [u"abc", None, u"def"]), + ("string_col", ["abc", None, "def"]), ("bytes_col", [b"abc", b"def", None]), ] ) @@ -8228,7 +8263,7 @@ def test_load_table_from_dataframe_w_partial_schema_extra_types(self): [ ("int_col", [1, 2, 3]), ("int_as_float_col", [1.0, float("nan"), 3.0]), - ("string_col", [u"abc", None, u"def"]), + ("string_col", ["abc", None, "def"]), ] ) dataframe = pandas.DataFrame(df_data, columns=df_data.keys()) @@ -8263,7 +8298,7 @@ def test_load_table_from_dataframe_w_partial_schema_missing_types(self): client = self._make_client() df_data = collections.OrderedDict( [ - ("string_col", [u"abc", u"def", u"ghi"]), + ("string_col", ["abc", "def", "ghi"]), ("unknown_col", [b"jkl", None, b"mno"]), ] ) @@ -8317,7 +8352,7 @@ def test_load_table_from_dataframe_w_schema_arrow_custom_compression(self): from google.cloud.bigquery.schema import SchemaField client = self._make_client() - records = [{"name": u"Monty", "age": 100}, {"name": u"Python", "age": 60}] + records = [{"name": "Monty", "age": 100}, {"name": "Python", "age": 60}] dataframe = pandas.DataFrame(records) schema = (SchemaField("name", "STRING"), SchemaField("age", "INTEGER")) job_config = job.LoadJobConfig(schema=schema) @@ -8658,6 +8693,44 @@ def test__do_resumable_upload(self): timeout=mock.ANY, ) + def test__do_resumable_upload_custom_project(self): + file_obj = self._make_file_obj() + file_obj_len = len(file_obj.getvalue()) + transport = self._make_transport( + self._make_resumable_upload_responses(file_obj_len) + ) + client = self._make_client(transport) + + result = client._do_resumable_upload( + file_obj, self.EXPECTED_CONFIGURATION, None, None, project="custom-project", + ) + + content = result.content.decode("utf-8") + assert json.loads(content) == {"size": file_obj_len} + + # Verify that configuration data was passed in with the initial + # request. + transport.request.assert_any_call( + "POST", + mock.ANY, + data=json.dumps(self.EXPECTED_CONFIGURATION).encode("utf-8"), + headers=mock.ANY, + timeout=mock.ANY, + ) + + # Check the project ID used in the call to initiate resumable upload. + initiation_url = next( + ( + call.args[1] + for call in transport.request.call_args_list + if call.args[0] == "POST" and "uploadType=resumable" in call.args[1] + ), + None, + ) # pragma: NO COVER + + assert initiation_url is not None + assert "projects/custom-project" in initiation_url + def test__do_multipart_upload(self): transport = self._make_transport([self._make_response(http.client.OK)]) client = self._make_client(transport)