Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for transaction statistics #849

Merged
merged 7 commits into from Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/reference.rst
Expand Up @@ -68,6 +68,7 @@ Job-Related Types
job.SourceFormat
job.WriteDisposition
job.SchemaUpdateOption
job.TransactionInfo


Dataset
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -70,6 +70,7 @@
from google.cloud.bigquery.job import ScriptOptions
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery.job import UnknownJob
from google.cloud.bigquery.job import TransactionInfo
from google.cloud.bigquery.job import WriteDisposition
from google.cloud.bigquery.model import Model
from google.cloud.bigquery.model import ModelReference
Expand Down Expand Up @@ -149,6 +150,7 @@
"GoogleSheetsOptions",
"ParquetOptions",
"ScriptOptions",
"TransactionInfo",
"DEFAULT_RETRY",
# Enum Constants
"enums",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -22,6 +22,7 @@
from google.cloud.bigquery.job.base import ReservationUsage
from google.cloud.bigquery.job.base import ScriptStatistics
from google.cloud.bigquery.job.base import ScriptStackFrame
from google.cloud.bigquery.job.base import TransactionInfo
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
Expand Down Expand Up @@ -81,5 +82,6 @@
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
"TransactionInfo",
"WriteDisposition",
]
29 changes: 29 additions & 0 deletions google/cloud/bigquery/job/base.py
Expand Up @@ -19,6 +19,7 @@
import http
import threading
import typing
from typing import Dict, Optional

from google.api_core import exceptions
import google.api_core.future.polling
Expand Down Expand Up @@ -88,6 +89,22 @@ def _error_result_to_exception(error_result):
)


class TransactionInfo(typing.NamedTuple):
plamut marked this conversation as resolved.
Show resolved Hide resolved
"""[Alpha] Information of a multi-statement transaction.

https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo

.. versionadded:: 2.24.0
"""

transaction_id: str
"""Output only. ID of the transaction."""

@classmethod
def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
return cls(transaction_info["transactionId"])


class _JobReference(object):
"""A reference to a job.

Expand Down Expand Up @@ -336,6 +353,18 @@ def reservation_usage(self):
for usage in usage_stats_raw
]

@property
def transaction_info(self) -> Optional[TransactionInfo]:
"""Information of the multi-statement transaction if this job is part of one.

.. versionadded:: 2.24.0
"""
info = self._properties.get("statistics", {}).get("transactionInfo")
if info is None:
return None
else:
return TransactionInfo.from_api_repr(info)

@property
def error_result(self):
"""Error information about the job as a whole.
Expand Down
34 changes: 34 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -1557,6 +1557,40 @@ def test_dml_statistics(self):
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_transaction_info(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = f"{Config.CLIENT.project}.{dataset_id}.test_dml_statistics"

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
BEGIN TRANSACTION;
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);

UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
COMMIT TRANSACTION;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

# Transaction ID set by the server should be accessible
assert query_job.transaction_info is not None
assert query_job.transaction_info.transaction_id != ""

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
Expand Down
1 change: 1 addition & 0 deletions tests/unit/job/helpers.py
Expand Up @@ -162,6 +162,7 @@ def _verifyInitialReadonlyProperties(self, job):
self.assertIsNone(job.created)
self.assertIsNone(job.started)
self.assertIsNone(job.ended)
self.assertIsNone(job.transaction_info)

# derived from resource['status']
self.assertIsNone(job.error_result)
Expand Down
14 changes: 14 additions & 0 deletions tests/unit/job/test_base.py
Expand Up @@ -227,6 +227,20 @@ def test_script_statistics(self):
self.assertEqual(stack_frame.end_column, 14)
self.assertEqual(stack_frame.text, "QUERY TEXT")

def test_transaction_info(self):
from google.cloud.bigquery.job.base import TransactionInfo

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
assert job.transaction_info is None

statistics = job._properties["statistics"] = {}
assert job.transaction_info is None

statistics["transactionInfo"] = {"transactionId": "123-abc-xyz"}
assert isinstance(job.transaction_info, TransactionInfo)
assert job.transaction_info.transaction_id == "123-abc-xyz"

def test_num_child_jobs(self):
client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, client)
Expand Down
29 changes: 29 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -128,6 +128,18 @@ def _verify_dml_stats_resource_properties(self, job, resource):
else:
assert job.dml_stats is None

def _verify_transaction_info_resource_properties(self, job, resource):
resource_stats = resource.get("statistics", {})

if "transactionInfo" in resource_stats:
resource_transaction_info = resource_stats["transactionInfo"]
job_transaction_info = job.transaction_info
assert job_transaction_info.transaction_id == resource_transaction_info.get(
"transactionId"
)
else:
assert job.transaction_info is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
Expand All @@ -137,6 +149,7 @@ def _verify_configuration_properties(self, job, configuration):
def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)
self._verify_transaction_info_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
Expand Down Expand Up @@ -325,6 +338,22 @@ def test_from_api_repr_with_dml_stats(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_transaction_info(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {"transactionInfo": {"transactionId": "1a2b-3c4d"}},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
Expand Down