Skip to content

Commit

Permalink
feat(bigquery): Addressed comments and add unit test
Browse files Browse the repository at this point in the history
  • Loading branch information
HemangChothani committed Feb 12, 2020
1 parent fbbf79d commit ad758aa
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 11 deletions.
19 changes: 8 additions & 11 deletions google/cloud/bigquery/client.py
Expand Up @@ -57,6 +57,7 @@
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1338,9 +1339,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
destination = TableReference.from_api_repr(
job_config["load"]["destinationTable"]
)
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
Expand All @@ -1349,14 +1348,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
copy_resource = job_config["copy"]
destination = TableReference.from_api_repr(
copy_resource["destinationTable"]
)
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
sources = []
source_configs = copy_resource.get("sourceTables")
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])

if source_configs is None:
source_configs = [copy_resource["sourceTable"]]
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
for source_config in source_configs:
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
Expand All @@ -1367,13 +1364,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
source = TableReference.from_api_repr(job_config["extract"]["sourceTable"])
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source, destination_uris, job_config=extract_job_config, retry=retry
)
elif "query" in job_config:
del job_config["query"]["destinationTable"]
_del_sub_prop(job_config, ["query", "destinationTable"])
query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
job_config
)
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -2893,6 +2893,68 @@ def test_create_job_query_config(self):
configuration, "google.cloud.bigquery.client.Client.query",
)

def test_create_job_query_config_w_rateLimitExceeded_error(self):
from google.cloud.exceptions import Forbidden
from google.cloud.bigquery.retry import DEFAULT_RETRY

query = "select count(*) from persons"
configuration = {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {"tableId": "table_id"},
}
}
resource = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "query_destination_table",
},
}
},
}
data_without_destination = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {"query": {"query": query, "useLegacySql": False}},
}

creds = _make_credentials()
http = object()
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
lambda exc: isinstance(exc, Forbidden)
)
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

api_request_patcher = mock.patch.object(
client._connection,
"api_request",
side_effect=[
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
resource,
],
)

with api_request_patcher as fake_api_request:
job = client.create_job(job_config=configuration, retry=retry)

self.assertEqual(job.destination.table_id, "query_destination_table")
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
self.assertEqual(
fake_api_request.call_args_list[1],
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data=data_without_destination,
timeout=None,
),
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
Expand Down

0 comments on commit ad758aa

Please sign in to comment.