Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for transaction statistics #849

Merged
merged 7 commits into from Aug 10, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -70,6 +70,7 @@
from google.cloud.bigquery.job import ScriptOptions
from google.cloud.bigquery.job import SourceFormat
from google.cloud.bigquery.job import UnknownJob
from google.cloud.bigquery.job import TransactionInfo
from google.cloud.bigquery.job import WriteDisposition
from google.cloud.bigquery.model import Model
from google.cloud.bigquery.model import ModelReference
Expand Down Expand Up @@ -149,6 +150,7 @@
"GoogleSheetsOptions",
"ParquetOptions",
"ScriptOptions",
"TransactionInfo",
"DEFAULT_RETRY",
# Enum Constants
"enums",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -38,6 +38,7 @@
from google.cloud.bigquery.job.query import QueryPlanEntryStep
from google.cloud.bigquery.job.query import ScriptOptions
from google.cloud.bigquery.job.query import TimelineEntry
from google.cloud.bigquery.job.query import TransactionInfo
from google.cloud.bigquery.enums import Compression
from google.cloud.bigquery.enums import CreateDisposition
from google.cloud.bigquery.enums import DestinationFormat
Expand Down Expand Up @@ -81,5 +82,6 @@
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
"TransactionInfo",
"WriteDisposition",
]
24 changes: 24 additions & 0 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -143,6 +143,20 @@ def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
return cls(*args)


class TransactionInfo(typing.NamedTuple):
"""[Alpha] Information of a multi-statement transaction.

https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#TransactionInfo
"""

transaction_id: str
"""Output only. ID of the transaction."""

@classmethod
def from_api_repr(cls, transaction_info: Dict[str, str]) -> "TransactionInfo":
return cls(transaction_info["transactionId"])


class ScriptOptions:
"""Options controlling the execution of scripts.

Expand Down Expand Up @@ -1116,6 +1130,16 @@ def dml_stats(self) -> Optional[DmlStats]:
else:
return DmlStats.from_api_repr(stats)

@property
def transaction_info(self) -> Optional[TransactionInfo]:
"""Information of the multi-statement transaction if this job is part of one.
"""
info = self._properties.get("statistics", {}).get("transactionInfo")
if info is None:
return None
else:
return TransactionInfo.from_api_repr(info)
plamut marked this conversation as resolved.
Show resolved Hide resolved

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
Expand Down
34 changes: 34 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -1557,6 +1557,40 @@ def test_dml_statistics(self):
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_transaction_info(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = f"{Config.CLIENT.project}.{dataset_id}.test_dml_statistics"

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
BEGIN TRANSACTION;
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);

UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
COMMIT TRANSACTION;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

# Transaction ID set by the server should be accessible
assert query_job.transaction_info is not None
assert query_job.transaction_info.transaction_id != ""

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
Expand Down
44 changes: 44 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -128,6 +128,18 @@ def _verify_dml_stats_resource_properties(self, job, resource):
else:
assert job.dml_stats is None

def _verify_transaction_info_resource_properties(self, job, resource):
resource_stats = resource.get("statistics", {})

if "transactionInfo" in resource_stats:
resource_transaction_info = resource_stats["transactionInfo"]
job_transaction_info = job.transaction_info
assert job_transaction_info.transaction_id == resource_transaction_info.get(
"transactionId"
)
else:
assert job.transaction_info is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
Expand All @@ -137,6 +149,7 @@ def _verify_configuration_properties(self, job, configuration):
def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)
self._verify_transaction_info_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
Expand Down Expand Up @@ -230,6 +243,7 @@ def test_ctor_defaults(self):
self.assertIsNone(job.time_partitioning)
self.assertIsNone(job.clustering_fields)
self.assertIsNone(job.schema_update_options)
self.assertIsNone(job.transaction_info)

def test_ctor_w_udf_resources(self):
from google.cloud.bigquery.job import QueryJobConfig
Expand Down Expand Up @@ -325,6 +339,22 @@ def test_from_api_repr_with_dml_stats(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_transaction_info(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {"transactionInfo": {"transactionId": "1a2b-3c4d"}},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
Expand Down Expand Up @@ -879,6 +909,20 @@ def test_dml_stats(self):
assert isinstance(job.dml_stats, DmlStats)
assert job.dml_stats.inserted_row_count == 35

def test_transaction_info(self):
from google.cloud.bigquery.job.query import TransactionInfo

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
assert job.transaction_info is None

statistics = job._properties["statistics"] = {}
assert job.transaction_info is None

statistics["transactionInfo"] = {"transactionId": "123-abc-xyz"}
assert isinstance(job.transaction_info, TransactionInfo)
assert job.transaction_info.transaction_id == "123-abc-xyz"

def test_result(self):
from google.cloud.bigquery.table import RowIterator

Expand Down