From 2abdef82bed31601d1ca1aa92a10fea1e09f5297 Mon Sep 17 00:00:00 2001 From: HemangChothani <50404902+HemangChothani@users.noreply.github.com> Date: Thu, 26 Mar 2020 15:54:51 +0530 Subject: [PATCH] feat(bigquery): add create job method (#32) * feat(bigquery): add create job method * feat(bigquery): Addressed comments and add unit test * feat(bigquery): make copy of job config for query job Co-authored-by: Peter Lamut --- google/cloud/bigquery/client.py | 66 +++++++++++++ tests/unit/test_client.py | 170 ++++++++++++++++++++++++++++++++ 2 files changed, 236 insertions(+) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index 6fe474218..a9c77d5e1 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -51,9 +51,11 @@ from google.cloud import exceptions from google.cloud.client import ClientWithProject +from google.cloud.bigquery._helpers import _get_sub_prop from google.cloud.bigquery._helpers import _record_field_to_json from google.cloud.bigquery._helpers import _str_or_none from google.cloud.bigquery._helpers import _verify_job_config_type +from google.cloud.bigquery._helpers import _del_sub_prop from google.cloud.bigquery._http import Connection from google.cloud.bigquery import _pandas_helpers from google.cloud.bigquery.dataset import Dataset @@ -1313,6 +1315,70 @@ def job_from_resource(self, resource): return job.QueryJob.from_api_repr(resource, self) return job.UnknownJob.from_api_repr(resource, self) + def create_job(self, job_config, retry=DEFAULT_RETRY): + """Create a new job. + Arguments: + job_config (dict): configuration job representation returned from the API. + + Keyword Arguments: + retry (google.api_core.retry.Retry): + (Optional) How to retry the RPC. + + Returns: + Union[ \ + google.cloud.bigquery.job.LoadJob, \ + google.cloud.bigquery.job.CopyJob, \ + google.cloud.bigquery.job.ExtractJob, \ + google.cloud.bigquery.job.QueryJob \ + ]: + A new job instance. + """ + + if "load" in job_config: + load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr( + job_config + ) + destination = _get_sub_prop(job_config, ["load", "destinationTable"]) + source_uris = _get_sub_prop(job_config, ["load", "sourceUris"]) + return self.load_table_from_uri( + source_uris, destination, job_config=load_job_config, retry=retry + ) + elif "copy" in job_config: + copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr( + job_config + ) + destination = _get_sub_prop(job_config, ["copy", "destinationTable"]) + sources = [] + source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"]) + + if source_configs is None: + source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])] + for source_config in source_configs: + table_ref = TableReference.from_api_repr(source_config) + sources.append(table_ref) + return self.copy_table( + sources, destination, job_config=copy_job_config, retry=retry + ) + elif "extract" in job_config: + extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr( + job_config + ) + source = _get_sub_prop(job_config, ["extract", "sourceTable"]) + destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"]) + return self.extract_table( + source, destination_uris, job_config=extract_job_config, retry=retry + ) + elif "query" in job_config: + copy_config = copy.deepcopy(job_config) + _del_sub_prop(copy_config, ["query", "destinationTable"]) + query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr( + copy_config + ) + query = _get_sub_prop(copy_config, ["query", "query"]) + return self.query(query, job_config=query_job_config, retry=retry) + else: + raise TypeError("Invalid job configuration received.") + def get_job( self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None ): diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index e4bc6af75..fddfa4b1b 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -2795,6 +2795,176 @@ def test_delete_table_w_not_found_ok_true(self): conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None) + def _create_job_helper(self, job_config, client_method): + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + client._connection = make_connection() + rf1 = mock.Mock() + get_config_patch = mock.patch( + "google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1, + ) + load_patch = mock.patch(client_method, autospec=True) + + with load_patch as client_method, get_config_patch: + client.create_job(job_config=job_config) + client_method.assert_called_once() + + def test_create_job_load_config(self): + configuration = { + "load": { + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "sourceUris": ["gs://test_bucket/src_object*"], + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.load_table_from_uri" + ) + + def test_create_job_copy_config(self): + configuration = { + "copy": { + "sourceTables": [ + { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + } + ], + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_copy_config_w_single_source(self): + configuration = { + "copy": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "destination_table", + }, + } + } + + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.copy_table", + ) + + def test_create_job_extract_config(self): + configuration = { + "extract": { + "sourceTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "source_table", + }, + "destinationUris": ["gs://test_bucket/dst_object*"], + } + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.extract_table", + ) + + def test_create_job_query_config(self): + configuration = { + "query": {"query": "query", "destinationTable": {"tableId": "table_id"}} + } + self._create_job_helper( + configuration, "google.cloud.bigquery.client.Client.query", + ) + + def test_create_job_query_config_w_rateLimitExceeded_error(self): + from google.cloud.exceptions import Forbidden + from google.cloud.bigquery.retry import DEFAULT_RETRY + + query = "select count(*) from persons" + configuration = { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": {"tableId": "table_id"}, + } + } + resource = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": { + "query": { + "query": query, + "useLegacySql": False, + "destinationTable": { + "projectId": self.PROJECT, + "datasetId": self.DS_ID, + "tableId": "query_destination_table", + }, + } + }, + } + data_without_destination = { + "jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY}, + "configuration": {"query": {"query": query, "useLegacySql": False}}, + } + + creds = _make_credentials() + http = object() + retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + lambda exc: isinstance(exc, Forbidden) + ) + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + api_request_patcher = mock.patch.object( + client._connection, + "api_request", + side_effect=[ + Forbidden("", errors=[{"reason": "rateLimitExceeded"}]), + resource, + ], + ) + + with api_request_patcher as fake_api_request: + job = client.create_job(job_config=configuration, retry=retry) + + self.assertEqual(job.destination.table_id, "query_destination_table") + self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once + self.assertEqual( + fake_api_request.call_args_list[1], + mock.call( + method="POST", + path="/projects/PROJECT/jobs", + data=data_without_destination, + timeout=None, + ), + ) + + def test_create_job_w_invalid_job_config(self): + configuration = {"unknown": {}} + creds = _make_credentials() + http = object() + client = self._make_one(project=self.PROJECT, credentials=creds, _http=http) + + with self.assertRaises(TypeError) as exc: + client.create_job(job_config=configuration) + + self.assertIn("Invalid job configuration", exc.exception.args[0]) + def test_job_from_resource_unknown_type(self): from google.cloud.bigquery.job import UnknownJob