Skip to content

Commit

Permalink
feat: add QueryJob.schema property for dry run queries (googleapis#…
Browse files Browse the repository at this point in the history
…1014)

Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [ ] Ensure the tests and linter pass
- [ ] Code coverage does not decrease (if any source code was changed)
- [ ] Appropriate docs were updated (if necessary)

Issue discovered while investigating what properties are needed in googleapis#967
  • Loading branch information
tswast authored and abdelmegahedgoogle committed Apr 17, 2023
1 parent 0742cf2 commit d843100
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 24 deletions.
4 changes: 3 additions & 1 deletion google/cloud/bigquery/job/base.py
Expand Up @@ -1005,7 +1005,9 @@ def from_api_repr(cls, resource: dict, client) -> "UnknownJob":
Returns:
UnknownJob: Job corresponding to the resource.
"""
job_ref_properties = resource.get("jobReference", {"projectId": client.project})
job_ref_properties = resource.get(
"jobReference", {"projectId": client.project, "jobId": None}
)
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, client)
# Populate the job reference with the project, even if it has been
Expand Down
24 changes: 20 additions & 4 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -18,7 +18,7 @@
import copy
import re
import typing
from typing import Any, Dict, Optional, Union
from typing import Any, Dict, List, Optional, Union

from google.api_core import exceptions
from google.api_core.future import polling as polling_future
Expand All @@ -38,6 +38,7 @@
from google.cloud.bigquery.query import UDFResource
from google.cloud.bigquery.retry import DEFAULT_RETRY, DEFAULT_JOB_RETRY
from google.cloud.bigquery.routine import RoutineReference
from google.cloud.bigquery.schema import SchemaField
from google.cloud.bigquery.table import _EmptyRowIterator
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import _table_arg_to_table_ref
Expand All @@ -57,6 +58,7 @@
import pyarrow
from google.api_core import retry as retries
from google.cloud import bigquery_storage
from google.cloud.bigquery.client import Client
from google.cloud.bigquery.table import RowIterator


Expand Down Expand Up @@ -853,7 +855,7 @@ def to_api_repr(self):
}

@classmethod
def from_api_repr(cls, resource: dict, client) -> "QueryJob":
def from_api_repr(cls, resource: dict, client: "Client") -> "QueryJob":
"""Factory: construct a job given its API representation
Args:
Expand All @@ -866,8 +868,10 @@ def from_api_repr(cls, resource: dict, client) -> "QueryJob":
Returns:
google.cloud.bigquery.job.QueryJob: Job parsed from ``resource``.
"""
cls._check_resource_config(resource)
job_ref = _JobReference._from_api_repr(resource["jobReference"])
job_ref_properties = resource.setdefault(
"jobReference", {"projectId": client.project, "jobId": None}
)
job_ref = _JobReference._from_api_repr(job_ref_properties)
job = cls(job_ref, None, client=client)
job._set_properties(resource)
return job
Expand All @@ -887,6 +891,18 @@ def query_plan(self):
plan_entries = self._job_statistics().get("queryPlan", ())
return [QueryPlanEntry.from_api_repr(entry) for entry in plan_entries]

@property
def schema(self) -> Optional[List[SchemaField]]:
"""The schema of the results.
Present only for successful dry run of non-legacy SQL queries.
"""
resource = self._job_statistics().get("schema")
if resource is None:
return None
fields = resource.get("fields", [])
return [SchemaField.from_api_repr(field) for field in fields]

@property
def timeline(self):
"""List(TimelineEntry): Return the query execution timeline
Expand Down
29 changes: 29 additions & 0 deletions tests/system/test_query.py
@@ -0,0 +1,29 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from google.cloud import bigquery


def test_dry_run(bigquery_client: bigquery.Client, scalars_table: str):
query_config = bigquery.QueryJobConfig()
query_config.dry_run = True

query_string = f"SELECT * FROM {scalars_table}"
query_job = bigquery_client.query(query_string, job_config=query_config,)

# Note: `query_job.result()` is not necessary on a dry run query. All
# necessary information is returned in the initial response.
assert query_job.dry_run is True
assert query_job.total_bytes_processed > 0
assert len(query_job.schema) > 0
56 changes: 37 additions & 19 deletions tests/unit/job/test_query.py
Expand Up @@ -269,25 +269,6 @@ def test_ctor_w_query_parameters(self):
job = self._make_one(self.JOB_ID, self.QUERY, client, job_config=config)
self.assertEqual(job.query_parameters, query_parameters)

def test_from_api_repr_missing_identity(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {}
klass = self._get_target_class()
with self.assertRaises(KeyError):
klass.from_api_repr(RESOURCE, client=client)

def test_from_api_repr_missing_config(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": "%s:%s" % (self.PROJECT, self.DS_ID),
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
}
klass = self._get_target_class()
with self.assertRaises(KeyError):
klass.from_api_repr(RESOURCE, client=client)

def test_from_api_repr_bare(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
Expand Down Expand Up @@ -1405,6 +1386,43 @@ def test_result_transport_timeout_error(self):
with call_api_patch, self.assertRaises(concurrent.futures.TimeoutError):
job.result(timeout=1)

def test_no_schema(self):
client = _make_client(project=self.PROJECT)
resource = {}
klass = self._get_target_class()
job = klass.from_api_repr(resource, client=client)
assert job.schema is None

def test_schema(self):
client = _make_client(project=self.PROJECT)
resource = {
"statistics": {
"query": {
"schema": {
"fields": [
{"mode": "NULLABLE", "name": "bool_col", "type": "BOOLEAN"},
{
"mode": "NULLABLE",
"name": "string_col",
"type": "STRING",
},
{
"mode": "NULLABLE",
"name": "timestamp_col",
"type": "TIMESTAMP",
},
]
},
},
},
}
klass = self._get_target_class()
job = klass.from_api_repr(resource, client=client)
assert len(job.schema) == 3
assert job.schema[0].field_type == "BOOLEAN"
assert job.schema[1].field_type == "STRING"
assert job.schema[2].field_type == "TIMESTAMP"

def test__begin_error(self):
from google.cloud import exceptions

Expand Down

0 comments on commit d843100

Please sign in to comment.