Skip to content

Commit

Permalink
fix: create_job method accepts dictionary arguments (#300)
Browse files Browse the repository at this point in the history
* fix: broken create_job method

* fix: changes in unit tests

* fix: fix sourceTable thing

* fix: handle sourceTable passed in job resource

* fix: remove delete destination table from query

* fix: revert destination table for query
  • Loading branch information
HemangChothani committed Oct 20, 2020
1 parent 82290c3 commit 155bacc
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 33 deletions.
12 changes: 8 additions & 4 deletions google/cloud/bigquery/client.py
Expand Up @@ -48,11 +48,11 @@
from google.cloud import exceptions
from google.cloud.client import ClientWithProject

from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._helpers import _get_sub_prop
from google.cloud.bigquery._helpers import _record_field_to_json
from google.cloud.bigquery._helpers import _str_or_none
from google.cloud.bigquery._helpers import _verify_job_config_type
from google.cloud.bigquery._helpers import _del_sub_prop
from google.cloud.bigquery._http import Connection
from google.cloud.bigquery import _pandas_helpers
from google.cloud.bigquery.dataset import Dataset
Expand Down Expand Up @@ -1619,6 +1619,7 @@ def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
)
destination = _get_sub_prop(job_config, ["load", "destinationTable"])
source_uris = _get_sub_prop(job_config, ["load", "sourceUris"])
destination = TableReference.from_api_repr(destination)
return self.load_table_from_uri(
source_uris,
destination,
Expand All @@ -1631,9 +1632,9 @@ def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
job_config
)
destination = _get_sub_prop(job_config, ["copy", "destinationTable"])
destination = TableReference.from_api_repr(destination)
sources = []
source_configs = _get_sub_prop(job_config, ["copy", "sourceTables"])

if source_configs is None:
source_configs = [_get_sub_prop(job_config, ["copy", "sourceTable"])]
for source_config in source_configs:
Expand All @@ -1651,10 +1652,13 @@ def create_job(self, job_config, retry=DEFAULT_RETRY, timeout=None):
job_config
)
source = _get_sub_prop(job_config, ["extract", "sourceTable"])
source_type = "Table"
if not source:
if source:
source_type = "Table"
source = TableReference.from_api_repr(source)
else:
source = _get_sub_prop(job_config, ["extract", "sourceModel"])
source_type = "Model"
source = ModelReference.from_api_repr(source)
destination_uris = _get_sub_prop(job_config, ["extract", "destinationUris"])
return self.extract_table(
source,
Expand Down
57 changes: 28 additions & 29 deletions tests/unit/test_client.py
Expand Up @@ -3573,21 +3573,28 @@ def test_delete_table_w_not_found_ok_true(self):

conn.api_request.assert_called_with(method="DELETE", path=path, timeout=None)

def _create_job_helper(self, job_config, client_method):
def _create_job_helper(self, job_config):
from google.cloud.bigquery import _helpers

creds = _make_credentials()
http = object()
client = self._make_one(project=self.PROJECT, credentials=creds, _http=http)

client._connection = make_connection()
rf1 = mock.Mock()
get_config_patch = mock.patch(
"google.cloud.bigquery.job._JobConfig.from_api_repr", return_value=rf1,
)
load_patch = mock.patch(client_method, autospec=True)
RESOURCE = {
"jobReference": {"projectId": self.PROJECT, "jobId": mock.ANY},
"configuration": job_config,
}
conn = client._connection = make_connection(RESOURCE)
client.create_job(job_config=job_config)
if "query" in job_config:
_helpers._del_sub_prop(job_config, ["query", "destinationTable"])

with load_patch as client_method, get_config_patch:
client.create_job(job_config=job_config)
client_method.assert_called_once()
conn.api_request.assert_called_once_with(
method="POST",
path="/projects/%s/jobs" % self.PROJECT,
data=RESOURCE,
timeout=None,
)

def test_create_job_load_config(self):
configuration = {
Expand All @@ -3601,9 +3608,7 @@ def test_create_job_load_config(self):
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.load_table_from_uri"
)
self._create_job_helper(configuration)

def test_create_job_copy_config(self):
configuration = {
Expand All @@ -3623,9 +3628,7 @@ def test_create_job_copy_config(self):
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)
self._create_job_helper(configuration)

def test_create_job_copy_config_w_single_source(self):
configuration = {
Expand All @@ -3643,9 +3646,7 @@ def test_create_job_copy_config_w_single_source(self):
}
}

self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.copy_table",
)
self._create_job_helper(configuration)

def test_create_job_extract_config(self):
configuration = {
Expand All @@ -3658,9 +3659,7 @@ def test_create_job_extract_config(self):
"destinationUris": ["gs://test_bucket/dst_object*"],
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.extract_table",
)
self._create_job_helper(configuration)

def test_create_job_extract_config_for_model(self):
configuration = {
Expand All @@ -3673,17 +3672,17 @@ def test_create_job_extract_config_for_model(self):
"destinationUris": ["gs://test_bucket/dst_object*"],
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.extract_table",
)
self._create_job_helper(configuration)

def test_create_job_query_config(self):
configuration = {
"query": {"query": "query", "destinationTable": {"tableId": "table_id"}}
"query": {
"query": "query",
"destinationTable": {"tableId": "table_id"},
"useLegacySql": False,
}
}
self._create_job_helper(
configuration, "google.cloud.bigquery.client.Client.query",
)
self._create_job_helper(configuration)

def test_create_job_query_config_w_rateLimitExceeded_error(self):
from google.cloud.exceptions import Forbidden
Expand Down

0 comments on commit 155bacc

Please sign in to comment.