Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add create job method #32

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
19 changes: 8 additions & 11 deletions google/cloud/bigquery/client.py
Expand Up @@ -57,6 +57,7 @@
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1338,9 +1339,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
destination = TableReference.from_api_repr(
job_config["load"]["destinationTable"]
)
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
Expand All @@ -1349,14 +1348,12 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
copy_resource = job_config["copy"]
destination = TableReference.from_api_repr(
copy_resource["destinationTable"]
)
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
sources = []
source_configs = copy_resource.get("sourceTables")
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])

if source_configs is None:
source_configs = [copy_resource["sourceTable"]]
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
for source_config in source_configs:
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
Expand All @@ -1367,13 +1364,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY):
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
source = TableReference.from_api_repr(job_config["extract"]["sourceTable"])
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source, destination_uris, job_config=extract_job_config, retry=retry
)
elif "query" in job_config:
del job_config["query"]["destinationTable"]
_del_sub_prop(job_config, ["query", "destinationTable"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still potentially modify the input argument in-place, do we mind? (cc: @tswast )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do that if at all possible. Maybe make a copy before calling _del_sub_prop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@plamut , @tswast Ok, may i do copy_config = copy.deepcopy(job_config) before calling _del_sub_prop ?

If i consider clear destination_table if it was a query job statement which mentioned in issue's description, need to modify the input argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HemangChothani Creating a copy and operating on the latter sounds good, yes.

If i consider clear destination_table if it was a query job statement which mentioned in issue's description, need to modify the input argument.

If I understood the ticket description correctly, it is not clear when a client library should clear the destination table property. But if we do clear it (as is the case with query jobs), it is safer to do it in a config copy, because users normally don't expect that their input parameter could be modified.

query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
job_config
)
Expand Down
62 changes: 62 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -2893,6 +2893,68 @@ def test_create_job_query_config(self):
configuration, "google.cloud.bigquery.client.Client.query",
)

def test_create_job_query_config_w_rateLimitExceeded_error(self):
from google.cloud.exceptions import Forbidden
from google.cloud.bigquery.retry import DEFAULT_RETRY

query = "select count(*) from persons"
configuration = {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {"tableId": "table_id"},
}
}
resource = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "query_destination_table",
},
}
},
}
data_without_destination = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {"query": {"query": query, "useLegacySql": False}},
}

creds = _make_credentials()
http = object()
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
lambda exc: isinstance(exc, Forbidden)
)
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

api_request_patcher = mock.patch.object(
client._connection,
"api_request",
side_effect=[
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
resource,
],
)

with api_request_patcher as fake_api_request:
job = client.create_job(job_config=configuration, retry=retry)

self.assertEqual(job.destination.table_id, "query_destination_table")
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
self.assertEqual(
fake_api_request.call_args_list[1],
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data=data_without_destination,
timeout=None,
),
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
Expand Down