Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat(bigquery): add create job method (#32)
* feat(bigquery): add create job method

* feat(bigquery): Addressed comments and add unit test

* feat(bigquery): make copy of job config for query job

Co-authored-by: Peter Lamut <plamut@users.noreply.github.com>
  • Loading branch information
HemangChothani and plamut committed Mar 26, 2020
1 parent 6182cf4 commit 2abdef8
Show file tree
Hide file tree
Showing 2 changed files with 236 additions and 0 deletions.
66 changes: 66 additions & 0 deletions google/cloud/bigquery/client.py
Expand Up @@ -51,9 +51,11 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1313,6 +1315,70 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(self, job_config, retry=DEFAULT_RETRY):
"""Create a new job.
Arguments:
job_config (dict): configuration job representation returned from the API.
Keyword Arguments:
retry (google.api_core.retry.Retry):
(Optional) How to retry the RPC.
Returns:
Union[ \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]:
A new job instance.
"""

if "load" in job_config:
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
)
elif "copy" in job_config:
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
sources = []
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])

if source_configs is None:
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
for source_config in source_configs:
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
return self.copy_table(
sources, destination, job_config=copy_job_config, retry=retry
)
elif "extract" in job_config:
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source, destination_uris, job_config=extract_job_config, retry=retry
)
elif "query" in job_config:
copy_config = copy.deepcopy(job_config)
_del_sub_prop(copy_config, ["query", "destinationTable"])
query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
copy_config
)
query = _get_sub_prop(copy_config, ["query", "query"])
return self.query(query, job_config=query_job_config, retry=retry)
else:
raise TypeError("Invalid job configuration received.")

def get_job(
self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None
):
Expand Down
170 changes: 170 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -2795,6 +2795,176 @@ def test_delete_table_w_not_found_ok_true(self):

conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None)

def _create_job_helper(self, job_config, client_method):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

client._connection = make_connection()
rf1 = mock.Mock()
get_config_patch = mock.patch(
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
)
load_patch = mock.patch(client_method, autospec=True)

with load_patch as client_method, get_config_patch:
client.create_job(job_config=job_config)
client_method.assert_called_once()

def test_create_job_load_config(self):
configuration = {
"load": {
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"sourceUris": ["gs://test_bucket/src_object*"],
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.load_table_from_uri"
)

def test_create_job_copy_config(self):
configuration = {
"copy": {
"sourceTables": [
{
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
}
],
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_copy_config_w_single_source(self):
configuration = {
"copy": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_extract_config(self):
configuration = {
"extract": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationUris": ["gs://test_bucket/dst_object*"],
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.extract_table",
)

def test_create_job_query_config(self):
configuration = {
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.query",
)

def test_create_job_query_config_w_rateLimitExceeded_error(self):
from google.cloud.exceptions import Forbidden
from google.cloud.bigquery.retry import DEFAULT_RETRY

query = "select count(*) from persons"
configuration = {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {"tableId": "table_id"},
}
}
resource = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "query_destination_table",
},
}
},
}
data_without_destination = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {"query": {"query": query, "useLegacySql": False}},
}

creds = _make_credentials()
http = object()
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
lambda exc: isinstance(exc, Forbidden)
)
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

api_request_patcher = mock.patch.object(
client._connection,
"api_request",
side_effect=[
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
resource,
],
)

with api_request_patcher as fake_api_request:
job = client.create_job(job_config=configuration, retry=retry)

self.assertEqual(job.destination.table_id, "query_destination_table")
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
self.assertEqual(
fake_api_request.call_args_list[1],
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data=data_without_destination,
timeout=None,
),
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

with self.assertRaises(TypeError) as exc:
client.create_job(job_config=configuration)

self.assertIn("Invalid job configuration", exc.exception.args[0])

def test_job_from_resource_unknown_type(self):
from google.cloud.bigquery.job import UnknownJob

Expand Down

0 comments on commit 2abdef8

Please sign in to comment.