Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bigquery): add create job method #32

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 65 additions & 0 deletions google/cloud/bigquery/client.py
Expand Up @@ -53,9 +53,11 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1314,6 +1316,69 @@ def job_from_resource(self, resource):
return job.QueryJob.from_api_repr(resource, self)
return job.UnknownJob.from_api_repr(resource, self)

def create_job(self, job_config, retry=DEFAULT_RETRY):
"""Create a new job.
Arguments:
job_config (dict): configuration job representation returned from the API.

Keyword Arguments:
retry (google.api_core.retry.Retry):
(Optional) How to retry the RPC.

Returns:
Union[ \
google.cloud.bigquery.job.LoadJob, \
google.cloud.bigquery.job.CopyJob, \
google.cloud.bigquery.job.ExtractJob, \
google.cloud.bigquery.job.QueryJob \
]:
A new job instance.
"""

if "load" in job_config:
load_job_config = google.cloud.bigquery.job.LoadJobConfig.from_api_repr(
job_config
)
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
return self.load_table_from_uri(
source_uris, destination, job_config=load_job_config, retry=retry
)
elif "copy" in job_config:
copy_job_config = google.cloud.bigquery.job.CopyJobConfig.from_api_repr(
job_config
)
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
sources = []
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])

if source_configs is None:
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
for source_config in source_configs:
table_ref = TableReference.from_api_repr(source_config)
sources.append(table_ref)
return self.copy_table(
sources, destination, job_config=copy_job_config, retry=retry
)
elif "extract" in job_config:
extract_job_config = google.cloud.bigquery.job.ExtractJobConfig.from_api_repr(
job_config
)
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source, destination_uris, job_config=extract_job_config, retry=retry
)
elif "query" in job_config:
_del_sub_prop(job_config, ["query", "destinationTable"])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will still potentially modify the input argument in-place, do we mind? (cc: @tswast )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's not do that if at all possible. Maybe make a copy before calling _del_sub_prop?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@plamut , @tswast Ok, may i do copy_config = copy.deepcopy(job_config) before calling _del_sub_prop ?

If i consider clear destination_table if it was a query job statement which mentioned in issue's description, need to modify the input argument.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@HemangChothani Creating a copy and operating on the latter sounds good, yes.

If i consider clear destination_table if it was a query job statement which mentioned in issue's description, need to modify the input argument.

If I understood the ticket description correctly, it is not clear when a client library should clear the destination table property. But if we do clear it (as is the case with query jobs), it is safer to do it in a config copy, because users normally don't expect that their input parameter could be modified.

query_job_config = google.cloud.bigquery.job.QueryJobConfig.from_api_repr(
job_config
)
query = _get_sub_prop(job_config, ["query", "query"])
return self.query(query, job_config=query_job_config, retry=retry)
else:
raise TypeError("Invalid job configuration received.")

def get_job(
self, job_id, project=None, location=None, retry=DEFAULT_RETRY, timeout=None
):
Expand Down
170 changes: 170 additions & 0 deletions tests/unit/test_client.py
Expand Up @@ -2796,6 +2796,176 @@ def test_delete_table_w_not_found_ok_true(self):

conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None)

def _create_job_helper(self, job_config, client_method):
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

client._connection = make_connection()
rf1 = mock.Mock()
get_config_patch = mock.patch(
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
)
load_patch = mock.patch(client_method, autospec=True)

with load_patch as client_method, get_config_patch:
client.create_job(job_config=job_config)
client_method.assert_called_once()

def test_create_job_load_config(self):
configuration = {
"load": {
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"sourceUris": ["gs://test_bucket/src_object*"],
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.load_table_from_uri"
)

def test_create_job_copy_config(self):
configuration = {
"copy": {
"sourceTables": [
{
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
}
],
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_copy_config_w_single_source(self):
configuration = {
"copy": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "destination_table",
},
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)

def test_create_job_extract_config(self):
configuration = {
"extract": {
"sourceTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "source_table",
},
"destinationUris": ["gs://test_bucket/dst_object*"],
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.extract_table",
)

def test_create_job_query_config(self):
configuration = {
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.query",
)

def test_create_job_query_config_w_rateLimitExceeded_error(self):
from google.cloud.exceptions import Forbidden
from google.cloud.bigquery.retry import DEFAULT_RETRY

query = "select count(*) from persons"
configuration = {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {"tableId": "table_id"},
}
}
resource = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {
"query": {
"query": query,
"useLegacySql": False,
"destinationTable": {
"projectId": self.PROJECT,
"datasetId": self.DS_ID,
"tableId": "query_destination_table",
},
}
},
}
data_without_destination = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": {"query": {"query": query, "useLegacySql": False}},
}

creds = _make_credentials()
http = object()
retry = DEFAULT_RETRY.with_deadline(1).with_predicate(
lambda exc: isinstance(exc, Forbidden)
)
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

api_request_patcher = mock.patch.object(
client._connection,
"api_request",
side_effect=[
Forbidden("", errors=[{"reason": "rateLimitExceeded"}]),
resource,
],
)

with api_request_patcher as fake_api_request:
job = client.create_job(job_config=configuration, retry=retry)

self.assertEqual(job.destination.table_id, "query_destination_table")
self.assertEqual(len(fake_api_request.call_args_list), 2) # was retried once
self.assertEqual(
fake_api_request.call_args_list[1],
mock.call(
method="POST",
path="/projects/PROJECT/jobs",
data=data_without_destination,
timeout=None,
),
)

def test_create_job_w_invalid_job_config(self):
configuration = {"unknown": {}}
creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

with self.assertRaises(TypeError) as exc:
client.create_job(job_config=configuration)

self.assertIn("Invalid job configuration", exc.exception.args[0])

def test_job_from_resource_unknown_type(self):
from google.cloud.bigquery.job import UnknownJob

Expand Down