From 36fe86f41c1a8f46167284f752a6d6bbf886a04b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Thu, 15 Jul 2021 19:37:17 +0200 Subject: [PATCH] feat: add support for more detailed DML stats (#758) * feat: add support for more detailed DML stats * Move is None check of DmlStats one level higher --- docs/reference.rst | 1 + google/cloud/bigquery/__init__.py | 2 + google/cloud/bigquery/job/__init__.py | 2 + google/cloud/bigquery/job/query.py | 37 ++++++++++++++++ tests/system/test_client.py | 56 +++++++++++++++++++++++ tests/unit/job/test_query.py | 64 +++++++++++++++++++++++++++ tests/unit/job/test_query_stats.py | 37 ++++++++++++++++ 7 files changed, 199 insertions(+) diff --git a/docs/reference.rst b/docs/reference.rst index cb2faa5ec..8c38d0c44 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -58,6 +58,7 @@ Job-Related Types job.Compression job.CreateDisposition job.DestinationFormat + job.DmlStats job.Encoding job.OperationType job.QueryPlanEntry diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index 65dde5d94..ced8cefae 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -56,6 +56,7 @@ from google.cloud.bigquery.job import CopyJobConfig from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import DestinationFormat +from google.cloud.bigquery.job import DmlStats from google.cloud.bigquery.job import Encoding from google.cloud.bigquery.job import ExtractJob from google.cloud.bigquery.job import ExtractJobConfig @@ -142,6 +143,7 @@ "BigtableOptions", "BigtableColumnFamily", "BigtableColumn", + "DmlStats", "CSVOptions", "GoogleSheetsOptions", "ParquetOptions", diff --git a/google/cloud/bigquery/job/__init__.py b/google/cloud/bigquery/job/__init__.py index 6bdfa09be..4c16d0e20 100644 --- a/google/cloud/bigquery/job/__init__.py +++ b/google/cloud/bigquery/job/__init__.py @@ -31,6 +31,7 @@ from google.cloud.bigquery.job.load import LoadJob from google.cloud.bigquery.job.load import LoadJobConfig from google.cloud.bigquery.job.query import _contains_order_by +from google.cloud.bigquery.job.query import DmlStats from google.cloud.bigquery.job.query import QueryJob from google.cloud.bigquery.job.query import QueryJobConfig from google.cloud.bigquery.job.query import QueryPlanEntry @@ -66,6 +67,7 @@ "LoadJob", "LoadJobConfig", "_contains_order_by", + "DmlStats", "QueryJob", "QueryJobConfig", "QueryPlanEntry", diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 6ff9f2647..d588e9b5a 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -114,6 +114,35 @@ def _to_api_repr_table_defs(value): return {k: ExternalConfig.to_api_repr(v) for k, v in value.items()} +class DmlStats(typing.NamedTuple): + """Detailed statistics for DML statements. + + https://cloud.google.com/bigquery/docs/reference/rest/v2/DmlStats + """ + + inserted_row_count: int = 0 + """Number of inserted rows. Populated by DML INSERT and MERGE statements.""" + + deleted_row_count: int = 0 + """Number of deleted rows. populated by DML DELETE, MERGE and TRUNCATE statements. + """ + + updated_row_count: int = 0 + """Number of updated rows. Populated by DML UPDATE and MERGE statements.""" + + @classmethod + def from_api_repr(cls, stats: Dict[str, str]) -> "DmlStats": + # NOTE: The field order here must match the order of fields set at the + # class level. + api_fields = ("insertedRowCount", "deletedRowCount", "updatedRowCount") + + args = ( + int(stats.get(api_field, default_val)) + for api_field, default_val in zip(api_fields, cls.__new__.__defaults__) + ) + return cls(*args) + + class ScriptOptions: """Options controlling the execution of scripts. @@ -1079,6 +1108,14 @@ def estimated_bytes_processed(self): result = int(result) return result + @property + def dml_stats(self) -> Optional[DmlStats]: + stats = self._job_statistics().get("dmlStats") + if stats is None: + return None + else: + return DmlStats.from_api_repr(stats) + def _blocking_poll(self, timeout=None, **kwargs): self._done_timeout = timeout self._transport_timeout = timeout diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 7234333a2..cbca73619 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -1521,6 +1521,62 @@ def test_query_statistics(self): self.assertGreater(stages_with_inputs, 0) self.assertGreater(len(plan), stages_with_inputs) + def test_dml_statistics(self): + table_schema = ( + bigquery.SchemaField("foo", "STRING"), + bigquery.SchemaField("bar", "INTEGER"), + ) + + dataset_id = _make_dataset_id("bq_system_test") + self.temp_dataset(dataset_id) + table_id = "{}.{}.test_dml_statistics".format(Config.CLIENT.project, dataset_id) + + # Create the table before loading so that the column order is deterministic. + table = helpers.retry_403(Config.CLIENT.create_table)( + Table(table_id, schema=table_schema) + ) + self.to_delete.insert(0, table) + + # Insert a few rows and check the stats. + sql = f""" + INSERT INTO `{table_id}` + VALUES ("one", 1), ("two", 2), ("three", 3), ("four", 4); + """ + query_job = Config.CLIENT.query(sql) + query_job.result() + + assert query_job.dml_stats is not None + assert query_job.dml_stats.inserted_row_count == 4 + assert query_job.dml_stats.updated_row_count == 0 + assert query_job.dml_stats.deleted_row_count == 0 + + # Update some of the rows. + sql = f""" + UPDATE `{table_id}` + SET bar = bar + 1 + WHERE bar > 2; + """ + query_job = Config.CLIENT.query(sql) + query_job.result() + + assert query_job.dml_stats is not None + assert query_job.dml_stats.inserted_row_count == 0 + assert query_job.dml_stats.updated_row_count == 2 + assert query_job.dml_stats.deleted_row_count == 0 + + # Now delete a few rows and check the stats. + sql = f""" + DELETE FROM `{table_id}` + WHERE foo != "two"; + """ + query_job = Config.CLIENT.query(sql) + query_job.result() + + assert query_job.dml_stats is not None + assert query_job.dml_stats.inserted_row_count == 0 + assert query_job.dml_stats.updated_row_count == 0 + assert query_job.dml_stats.deleted_row_count == 3 + def test_dbapi_w_standard_sql_types(self): for sql, expected in helpers.STANDARD_SQL_EXAMPLES: Config.CURSOR.execute(sql) diff --git a/tests/unit/job/test_query.py b/tests/unit/job/test_query.py index 4665933ea..482f7f3af 100644 --- a/tests/unit/job/test_query.py +++ b/tests/unit/job/test_query.py @@ -110,6 +110,24 @@ def _verify_table_definitions(self, job, config): self.assertIsNotNone(expected_ec) self.assertEqual(found_ec.to_api_repr(), expected_ec) + def _verify_dml_stats_resource_properties(self, job, resource): + query_stats = resource.get("statistics", {}).get("query", {}) + + if "dmlStats" in query_stats: + resource_dml_stats = query_stats["dmlStats"] + job_dml_stats = job.dml_stats + assert str(job_dml_stats.inserted_row_count) == resource_dml_stats.get( + "insertedRowCount", "0" + ) + assert str(job_dml_stats.updated_row_count) == resource_dml_stats.get( + "updatedRowCount", "0" + ) + assert str(job_dml_stats.deleted_row_count) == resource_dml_stats.get( + "deletedRowCount", "0" + ) + else: + assert job.dml_stats is None + def _verify_configuration_properties(self, job, configuration): if "dryRun" in configuration: self.assertEqual(job.dry_run, configuration["dryRun"]) @@ -118,6 +136,7 @@ def _verify_configuration_properties(self, job, configuration): def _verifyResourceProperties(self, job, resource): self._verifyReadonlyResourceProperties(job, resource) + self._verify_dml_stats_resource_properties(job, resource) configuration = resource.get("configuration", {}) self._verify_configuration_properties(job, configuration) @@ -130,16 +149,19 @@ def _verifyResourceProperties(self, job, resource): self._verify_table_definitions(job, query_config) self.assertEqual(job.query, query_config["query"]) + if "createDisposition" in query_config: self.assertEqual(job.create_disposition, query_config["createDisposition"]) else: self.assertIsNone(job.create_disposition) + if "defaultDataset" in query_config: ds_ref = job.default_dataset ds_ref = {"projectId": ds_ref.project, "datasetId": ds_ref.dataset_id} self.assertEqual(ds_ref, query_config["defaultDataset"]) else: self.assertIsNone(job.default_dataset) + if "destinationTable" in query_config: table = job.destination tb_ref = { @@ -150,14 +172,17 @@ def _verifyResourceProperties(self, job, resource): self.assertEqual(tb_ref, query_config["destinationTable"]) else: self.assertIsNone(job.destination) + if "priority" in query_config: self.assertEqual(job.priority, query_config["priority"]) else: self.assertIsNone(job.priority) + if "writeDisposition" in query_config: self.assertEqual(job.write_disposition, query_config["writeDisposition"]) else: self.assertIsNone(job.write_disposition) + if "destinationEncryptionConfiguration" in query_config: self.assertIsNotNone(job.destination_encryption_configuration) self.assertEqual( @@ -166,6 +191,7 @@ def _verifyResourceProperties(self, job, resource): ) else: self.assertIsNone(job.destination_encryption_configuration) + if "schemaUpdateOptions" in query_config: self.assertEqual( job.schema_update_options, query_config["schemaUpdateOptions"] @@ -190,6 +216,7 @@ def test_ctor_defaults(self): self.assertIsNone(job.create_disposition) self.assertIsNone(job.default_dataset) self.assertIsNone(job.destination) + self.assertIsNone(job.dml_stats) self.assertIsNone(job.flatten_results) self.assertIsNone(job.priority) self.assertIsNone(job.use_query_cache) @@ -278,6 +305,26 @@ def test_from_api_repr_with_encryption(self): self.assertIs(job._client, client) self._verifyResourceProperties(job, RESOURCE) + def test_from_api_repr_with_dml_stats(self): + self._setUpConstants() + client = _make_client(project=self.PROJECT) + RESOURCE = { + "id": self.JOB_ID, + "jobReference": {"projectId": self.PROJECT, "jobId": self.JOB_ID}, + "configuration": {"query": {"query": self.QUERY}}, + "statistics": { + "query": { + "dmlStats": {"insertedRowCount": "15", "updatedRowCount": "2"}, + }, + }, + } + klass = self._get_target_class() + + job = klass.from_api_repr(RESOURCE, client=client) + + self.assertIs(job._client, client) + self._verifyResourceProperties(job, RESOURCE) + def test_from_api_repr_w_properties(self): from google.cloud.bigquery.job import CreateDisposition from google.cloud.bigquery.job import SchemaUpdateOption @@ -815,6 +862,23 @@ def test_estimated_bytes_processed(self): query_stats["estimatedBytesProcessed"] = str(est_bytes) self.assertEqual(job.estimated_bytes_processed, est_bytes) + def test_dml_stats(self): + from google.cloud.bigquery.job.query import DmlStats + + client = _make_client(project=self.PROJECT) + job = self._make_one(self.JOB_ID, self.QUERY, client) + assert job.dml_stats is None + + statistics = job._properties["statistics"] = {} + assert job.dml_stats is None + + query_stats = statistics["query"] = {} + assert job.dml_stats is None + + query_stats["dmlStats"] = {"insertedRowCount": "35"} + assert isinstance(job.dml_stats, DmlStats) + assert job.dml_stats.inserted_row_count == 35 + def test_result(self): from google.cloud.bigquery.table import RowIterator diff --git a/tests/unit/job/test_query_stats.py b/tests/unit/job/test_query_stats.py index 09a0efc45..e70eb097c 100644 --- a/tests/unit/job/test_query_stats.py +++ b/tests/unit/job/test_query_stats.py @@ -15,6 +15,43 @@ from .helpers import _Base +class TestDmlStats: + @staticmethod + def _get_target_class(): + from google.cloud.bigquery.job import DmlStats + + return DmlStats + + def _make_one(self, *args, **kw): + return self._get_target_class()(*args, **kw) + + def test_ctor_defaults(self): + dml_stats = self._make_one() + assert dml_stats.inserted_row_count == 0 + assert dml_stats.deleted_row_count == 0 + assert dml_stats.updated_row_count == 0 + + def test_from_api_repr_partial_stats(self): + klass = self._get_target_class() + result = klass.from_api_repr({"deletedRowCount": "12"}) + + assert isinstance(result, klass) + assert result.inserted_row_count == 0 + assert result.deleted_row_count == 12 + assert result.updated_row_count == 0 + + def test_from_api_repr_full_stats(self): + klass = self._get_target_class() + result = klass.from_api_repr( + {"updatedRowCount": "4", "insertedRowCount": "7", "deletedRowCount": "25"} + ) + + assert isinstance(result, klass) + assert result.inserted_row_count == 7 + assert result.deleted_row_count == 25 + assert result.updated_row_count == 4 + + class TestQueryPlanEntryStep(_Base): KIND = "KIND" SUBSTEPS = ("SUB1", "SUB2")