Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
feat: add support for more detailed DML stats (#758)
* feat: add support for more detailed DML stats

* Move is None check of DmlStats one level higher
  • Loading branch information
plamut committed Jul 15, 2021
1 parent c45a738 commit 36fe86f
Show file tree
Hide file tree
Showing 7 changed files with 199 additions and 0 deletions.
1 change: 1 addition & 0 deletions docs/reference.rst
Expand Up @@ -58,6 +58,7 @@ Job-Related Types
job.Compression
job.CreateDisposition
job.DestinationFormat
job.DmlStats
job.Encoding
job.OperationType
job.QueryPlanEntry
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -56,6 +56,7 @@
from google.cloud.bigquery.job import CopyJobConfig
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import DestinationFormat
from google.cloud.bigquery.job import DmlStats
from google.cloud.bigquery.job import Encoding
from google.cloud.bigquery.job import ExtractJob
from google.cloud.bigquery.job import ExtractJobConfig
Expand Down Expand Up @@ -142,6 +143,7 @@
"BigtableOptions",
"BigtableColumnFamily",
"BigtableColumn",
"DmlStats",
"CSVOptions",
"GoogleSheetsOptions",
"ParquetOptions",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -31,6 +31,7 @@
from google.cloud.bigquery.job.load import LoadJob
from google.cloud.bigquery.job.load import LoadJobConfig
from google.cloud.bigquery.job.query import _contains_order_by
from google.cloud.bigquery.job.query import DmlStats
from google.cloud.bigquery.job.query import QueryJob
from google.cloud.bigquery.job.query import QueryJobConfig
from google.cloud.bigquery.job.query import QueryPlanEntry
Expand Down Expand Up @@ -66,6 +67,7 @@
"LoadJob",
"LoadJobConfig",
"_contains_order_by",
"DmlStats",
"QueryJob",
"QueryJobConfig",
"QueryPlanEntry",
Expand Down
37 changes: 37 additions & 0 deletions google/cloud/bigquery/job/query.py
Expand Up @@ -114,6 +114,35 @@ def _to_api_repr_table_defs(value):
return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()}


class DmlStats(typing.NamedTuple):
"""Detailed statistics for DML statements.
https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats
"""

inserted_row_count: int = 0
"""Number of inserted rows. Populated by DML INSERT and MERGE statements."""

deleted_row_count: int = 0
"""Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements.
"""

updated_row_count: int = 0
"""Number of updated rows. Populated by DML UPDATE and MERGE statements."""

@classmethod
def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats":
# NOTE: The field order here must match the order of fields set at the
# class level.
api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount")

args = (
int(stats.get(api_field, default_val))
for api_field, default_val in zip(api_fields, cls.__new__.__defaults__)
)
return cls(*args)


class ScriptOptions:
"""Options controlling the execution of scripts.
Expand Down Expand Up @@ -1079,6 +1108,14 @@ def estimated_bytes_processed(self):
result = int(result)
return result

@property
def dml_stats(self) -> Optional[DmlStats]:
stats = self._job_statistics().get("dmlStats")
if stats is None:
return None
else:
return DmlStats.from_api_repr(stats)

def _blocking_poll(self, timeout=None, **kwargs):
self._done_timeout = timeout
self._transport_timeout = timeout
Expand Down
56 changes: 56 additions & 0 deletions tests/system/test_client.py
Expand Up @@ -1521,6 +1521,62 @@ def test_query_statistics(self):
self.assertGreater(stages_with_inputs, 0)
self.assertGreater(len(plan), stages_with_inputs)

def test_dml_statistics(self):
table_schema = (
bigquery.SchemaField("foo", "STRING"),
bigquery.SchemaField("bar", "INTEGER"),
)

dataset_id = _make_dataset_id("bq_system_test")
self.temp_dataset(dataset_id)
table_id = "{}.{}.test_dml_statistics".format(Config.CLIENT.project, dataset_id)

# Create the table before loading so that the column order is deterministic.
table = helpers.retry_403(Config.CLIENT.create_table)(
Table(table_id, schema=table_schema)
)
self.to_delete.insert(0, table)

# Insert a few rows and check the stats.
sql = f"""
INSERT INTO `{table_id}`
VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4);
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 4
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 0

# Update some of the rows.
sql = f"""
UPDATE `{table_id}`
SET bar = bar + 1
WHERE bar > 2;
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 2
assert query_job.dml_stats.deleted_row_count == 0

# Now delete a few rows and check the stats.
sql = f"""
DELETE FROM `{table_id}`
WHERE foo != "two";
"""
query_job = Config.CLIENT.query(sql)
query_job.result()

assert query_job.dml_stats is not None
assert query_job.dml_stats.inserted_row_count == 0
assert query_job.dml_stats.updated_row_count == 0
assert query_job.dml_stats.deleted_row_count == 3

def test_dbapi_w_standard_sql_types(self):
for sql, expected in helpers.STANDARD_SQL_EXAMPLES:
Config.CURSOR.execute(sql)
Expand Down
64 changes: 64 additions & 0 deletions tests/unit/job/test_query.py
Expand Up @@ -110,6 +110,24 @@ def _verify_table_definitions(self, job, config):
self.assertIsNotNone(expected_ec)
self.assertEqual(found_ec.to_api_repr(), expected_ec)

def _verify_dml_stats_resource_properties(self, job, resource):
query_stats = resource.get("statistics", {}).get("query", {})

if "dmlStats" in query_stats:
resource_dml_stats = query_stats["dmlStats"]
job_dml_stats = job.dml_stats
assert str(job_dml_stats.inserted_row_count) == resource_dml_stats.get(
"insertedRowCount", "0"
)
assert str(job_dml_stats.updated_row_count) == resource_dml_stats.get(
"updatedRowCount", "0"
)
assert str(job_dml_stats.deleted_row_count) == resource_dml_stats.get(
"deletedRowCount", "0"
)
else:
assert job.dml_stats is None

def _verify_configuration_properties(self, job, configuration):
if "dryRun" in configuration:
self.assertEqual(job.dry_run, configuration["dryRun"])
Expand All @@ -118,6 +136,7 @@ def _verify_configuration_properties(self, job, configuration):

def _verifyResourceProperties(self, job, resource):
self._verifyReadonlyResourceProperties(job, resource)
self._verify_dml_stats_resource_properties(job, resource)

configuration = resource.get("configuration", {})
self._verify_configuration_properties(job, configuration)
Expand All @@ -130,16 +149,19 @@ def _verifyResourceProperties(self, job, resource):
self._verify_table_definitions(job, query_config)

self.assertEqual(job.query, query_config["query"])

if "createDisposition" in query_config:
self.assertEqual(job.create_disposition, query_config["createDisposition"])
else:
self.assertIsNone(job.create_disposition)

if "defaultDataset" in query_config:
ds_ref = job.default_dataset
ds_ref = {"projectId": ds_ref.project, "datasetId": ds_ref.dataset_id}
self.assertEqual(ds_ref, query_config["defaultDataset"])
else:
self.assertIsNone(job.default_dataset)

if "destinationTable" in query_config:
table = job.destination
tb_ref = {
Expand All @@ -150,14 +172,17 @@ def _verifyResourceProperties(self, job, resource):
self.assertEqual(tb_ref, query_config["destinationTable"])
else:
self.assertIsNone(job.destination)

if "priority" in query_config:
self.assertEqual(job.priority, query_config["priority"])
else:
self.assertIsNone(job.priority)

if "writeDisposition" in query_config:
self.assertEqual(job.write_disposition, query_config["writeDisposition"])
else:
self.assertIsNone(job.write_disposition)

if "destinationEncryptionConfiguration" in query_config:
self.assertIsNotNone(job.destination_encryption_configuration)
self.assertEqual(
Expand All @@ -166,6 +191,7 @@ def _verifyResourceProperties(self, job, resource):
)
else:
self.assertIsNone(job.destination_encryption_configuration)

if "schemaUpdateOptions" in query_config:
self.assertEqual(
job.schema_update_options, query_config["schemaUpdateOptions"]
Expand All @@ -190,6 +216,7 @@ def test_ctor_defaults(self):
self.assertIsNone(job.create_disposition)
self.assertIsNone(job.default_dataset)
self.assertIsNone(job.destination)
self.assertIsNone(job.dml_stats)
self.assertIsNone(job.flatten_results)
self.assertIsNone(job.priority)
self.assertIsNone(job.use_query_cache)
Expand Down Expand Up @@ -278,6 +305,26 @@ def test_from_api_repr_with_encryption(self):
self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_with_dml_stats(self):
self._setUpConstants()
client = _make_client(project=self.PROJECT)
RESOURCE = {
"id": self.JOB_ID,
"jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID},
"configuration": {"query": {"query": self.QUERY}},
"statistics": {
"query": {
"dmlStats": {"insertedRowCount": "15", "updatedRowCount": "2"},
},
},
}
klass = self._get_target_class()

job = klass.from_api_repr(RESOURCE, client=client)

self.assertIs(job._client, client)
self._verifyResourceProperties(job, RESOURCE)

def test_from_api_repr_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import SchemaUpdateOption
Expand Down Expand Up @@ -815,6 +862,23 @@ def test_estimated_bytes_processed(self):
query_stats["estimatedBytesProcessed"] = str(est_bytes)
self.assertEqual(job.estimated_bytes_processed, est_bytes)

def test_dml_stats(self):
from google.cloud.bigquery.job.query import DmlStats

client = _make_client(project=self.PROJECT)
job = self._make_one(self.JOB_ID, self.QUERY, client)
assert job.dml_stats is None

statistics = job._properties["statistics"] = {}
assert job.dml_stats is None

query_stats = statistics["query"] = {}
assert job.dml_stats is None

query_stats["dmlStats"] = {"insertedRowCount": "35"}
assert isinstance(job.dml_stats, DmlStats)
assert job.dml_stats.inserted_row_count == 35

def test_result(self):
from google.cloud.bigquery.table import RowIterator

Expand Down
37 changes: 37 additions & 0 deletions tests/unit/job/test_query_stats.py
Expand Up @@ -15,6 +15,43 @@
from .helpers import _Base


class TestDmlStats:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.job import DmlStats

return DmlStats

def _make_one(self, *args, **kw):
return self._get_target_class()(*args, **kw)

def test_ctor_defaults(self):
dml_stats = self._make_one()
assert dml_stats.inserted_row_count == 0
assert dml_stats.deleted_row_count == 0
assert dml_stats.updated_row_count == 0

def test_from_api_repr_partial_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr({"deletedRowCount": "12"})

assert isinstance(result, klass)
assert result.inserted_row_count == 0
assert result.deleted_row_count == 12
assert result.updated_row_count == 0

def test_from_api_repr_full_stats(self):
klass = self._get_target_class()
result = klass.from_api_repr(
{"updatedRowCount": "4", "insertedRowCount": "7", "deletedRowCount": "25"}
)

assert isinstance(result, klass)
assert result.inserted_row_count == 7
assert result.deleted_row_count == 25
assert result.updated_row_count == 4


class TestQueryPlanEntryStep(_Base):
KIND = "KIND"
SUBSTEPS = ("SUB1", "SUB2")
Expand Down

0 comments on commit 36fe86f

Please sign in to comment.