Skip to content

Commit

Permalink
feat: add support for transaction statistics (#849)
Browse files Browse the repository at this point in the history
* feat: add support for transaction statistics

* Hoist transaction_info into base job class

* Add versionadded directive to new property and class

* Include new class in docs reference
  • Loading branch information
plamut committed Aug 10, 2021
1 parent 9c6614f commit 7f7b1a8
Show file tree
Hide file tree
Showing 8 changed files with 112 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Expand Up @@ -68,6 +68,7 @@ Job-Related Types
job.SourceFormat
job.WriteDisposition
job.SchemaUpdateOption
job.TransactionInfo


Dataset
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -70,6 +70,7 @@
from google.cloud.bigquery.job import ScriptOptions
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery.job import UnknownJob
from google.cloud.bigquery.job import TransactionInfo
from google.cloud.bigquery.job import WriteDisposition
from google.cloud.bigquery.model import Model
from google.cloud.bigquery.model import ModelReference
Expand Down Expand Up @@ -149,6 +150,7 @@
"GoogleSheetsOptions",
"ParquetOptions",
"ScriptOptions",
"TransactionInfo",
"DEFAULT_RETRY",
# Enum Constants
"enums",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -22,6 +22,7 @@
from google.cloud.bigquery.job.base import ReservationUsage
from google.cloud.bigquery.job.base import ScriptStatistics
from google.cloud.bigquery.job.base import ScriptStackFrame
from google.cloud.bigquery.job.base import TransactionInfo
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
Expand Down Expand Up @@ -81,5 +82,6 @@
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
"TransactionInfo",
"WriteDisposition",
]
29 changes: 29 additions & 0 deletions google/cloud/bigquery/job/base.py
Expand Up @@ -19,6 +19,7 @@
import http
import threading
import typing
from typing import Dict, Optional

from google.api_core import exceptions
import google.api_core.future.polling
Expand Down Expand Up @@ -88,6 +89,22 @@ def _error_result_to_exception(error_result):
)


class TransactionInfo(typing.NamedTuple):
"""[Alpha] Information of a multi-statement transaction.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
.. versionadded:: 2.24.0
"""

transaction_id: str
"""Output only. ID of the transaction."""

@classmethod
def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
return cls(transaction_info["transactionId"])


class _JobReference(object):
"""A reference to a job.
Expand Down Expand Up @@ -336,6 +353,18 @@ def reservation_usage(self):
for usage in usage_stats_raw
]

@property
def transaction_info(self) -> Optional[TransactionInfo]:
"""Information of the multi-statement transaction if this job is part of one.
.. versionadded:: 2.24.0
"""
info = self._properties.get("statistics", {}).get("transactionInfo")
if info is None:
return None
else:
return TransactionInfo.from_api_repr(info)

@property
def error_result(self):
"""Error information about the job as a whole.
Expand Down
34 changes: 34 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -1557,6 +1557,40 @@ def test_dml_statistics(self):
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_transaction_info(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = f"{Config.CLIENT.project}.{dataset_id}.test_dml_statistics"

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
BEGIN TRANSACTION;
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);
UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
COMMIT TRANSACTION;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

# Transaction ID set by the server should be accessible
assert query_job.transaction_info is not None
assert query_job.transaction_info.transaction_id != ""

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/job/helpers.py
Expand Up @@ -162,6 +162,7 @@ def _verifyInitialReadonlyProperties(self, job):
self.assertIsNone(job.created)
self.assertIsNone(job.started)
self.assertIsNone(job.ended)
self.assertIsNone(job.transaction_info)

# derived from resource['status']
self.assertIsNone(job.error_result)
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/job/test_base.py
Expand Up @@ -227,6 +227,20 @@ def test_script_statistics(self):
self.assertEqual(stack_frame.end_column, 14)
self.assertEqual(stack_frame.text, "QUERY TEXT")

def test_transaction_info(self):
from google.cloud.bigquery.job.base import TransactionInfo

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
assert job.transaction_info is None

statistics = job._properties["statistics"] = {}
assert job.transaction_info is None

statistics["transactionInfo"] = {"transactionId": "123-abc-xyz"}
assert isinstance(job.transaction_info, TransactionInfo)
assert job.transaction_info.transaction_id == "123-abc-xyz"

def test_num_child_jobs(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -128,6 +128,18 @@ def _verify_dml_stats_resource_properties(self, job, resource):
else:
assert job.dml_stats is None

def _verify_transaction_info_resource_properties(self, job, resource):
resource_stats = resource.get("statistics", {})

if "transactionInfo" in resource_stats:
resource_transaction_info = resource_stats["transactionInfo"]
job_transaction_info = job.transaction_info
assert job_transaction_info.transaction_id == resource_transaction_info.get(
"transactionId"
)
else:
assert job.transaction_info is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
Expand All @@ -137,6 +149,7 @@ def _verify_configuration_properties(self, job, configuration):
def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)
self._verify_transaction_info_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
Expand Down Expand Up @@ -325,6 +338,22 @@ def test_from_api_repr_with_dml_stats(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_transaction_info(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {"transactionInfo": {"transactionId": "1a2b-3c4d"}},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
Expand Down

0 comments on commit 7f7b1a8

Please sign in to comment.