From ba86b2a6300ae5a9f3c803beeb42bda4c522e34c Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Sat, 10 Jul 2021 11:29:00 +0200 Subject: [PATCH] feat: add support for table snapshots (#740) * feat: add support for table snapshots * Add system test for table snapshots * Make test taxonomy resource name unique * Store timezone aware snapshot time on snapshots * Make copy config tests more detailed * Use unique resource ID differently for display name * Add new classes to docs --- docs/reference.rst | 2 + google/cloud/bigquery/__init__.py | 4 ++ google/cloud/bigquery/job/__init__.py | 2 + google/cloud/bigquery/job/copy_.py | 38 ++++++++++++++ google/cloud/bigquery/table.py | 37 ++++++++++++++ tests/system/test_client.py | 71 ++++++++++++++++++++++++- tests/unit/job/test_copy.py | 34 +++++++++++- tests/unit/test_table.py | 74 +++++++++++++++++++++++++++ 8 files changed, 260 insertions(+), 2 deletions(-) diff --git a/docs/reference.rst b/docs/reference.rst index 694379cd2..cb2faa5ec 100644 --- a/docs/reference.rst +++ b/docs/reference.rst @@ -59,6 +59,7 @@ Job-Related Types job.CreateDisposition job.DestinationFormat job.Encoding + job.OperationType job.QueryPlanEntry job.QueryPlanEntryStep job.QueryPriority @@ -90,6 +91,7 @@ Table table.RangePartitioning table.Row table.RowIterator + table.SnapshotDefinition table.Table table.TableListItem table.TableReference diff --git a/google/cloud/bigquery/__init__.py b/google/cloud/bigquery/__init__.py index b97224176..65dde5d94 100644 --- a/google/cloud/bigquery/__init__.py +++ b/google/cloud/bigquery/__init__.py @@ -61,6 +61,7 @@ from google.cloud.bigquery.job import ExtractJobConfig from google.cloud.bigquery.job import LoadJob from google.cloud.bigquery.job import LoadJobConfig +from google.cloud.bigquery.job import OperationType from google.cloud.bigquery.job import QueryJob from google.cloud.bigquery.job import QueryJobConfig from google.cloud.bigquery.job import QueryPriority @@ -87,6 +88,7 @@ from google.cloud.bigquery.table import PartitionRange from google.cloud.bigquery.table import RangePartitioning from google.cloud.bigquery.table import Row +from google.cloud.bigquery.table import SnapshotDefinition from google.cloud.bigquery.table import Table from google.cloud.bigquery.table import TableReference from google.cloud.bigquery.table import TimePartitioningType @@ -115,6 +117,7 @@ "PartitionRange", "RangePartitioning", "Row", + "SnapshotDefinition", "TimePartitioning", "TimePartitioningType", # Jobs @@ -155,6 +158,7 @@ "ExternalSourceFormat", "Encoding", "KeyResultStatementKind", + "OperationType", "QueryPriority", "SchemaUpdateOption", "SourceFormat", diff --git a/google/cloud/bigquery/job/__init__.py b/google/cloud/bigquery/job/__init__.py index cdab92e05..6bdfa09be 100644 --- a/google/cloud/bigquery/job/__init__.py +++ b/google/cloud/bigquery/job/__init__.py @@ -25,6 +25,7 @@ from google.cloud.bigquery.job.base import UnknownJob from google.cloud.bigquery.job.copy_ import CopyJob from google.cloud.bigquery.job.copy_ import CopyJobConfig +from google.cloud.bigquery.job.copy_ import OperationType from google.cloud.bigquery.job.extract import ExtractJob from google.cloud.bigquery.job.extract import ExtractJobConfig from google.cloud.bigquery.job.load import LoadJob @@ -59,6 +60,7 @@ "UnknownJob", "CopyJob", "CopyJobConfig", + "OperationType", "ExtractJob", "ExtractJobConfig", "LoadJob", diff --git a/google/cloud/bigquery/job/copy_.py b/google/cloud/bigquery/job/copy_.py index 95f4b613b..c6ee98944 100644 --- a/google/cloud/bigquery/job/copy_.py +++ b/google/cloud/bigquery/job/copy_.py @@ -14,6 +14,8 @@ """Classes for copy jobs.""" +from typing import Optional + from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration from google.cloud.bigquery import _helpers from google.cloud.bigquery.table import TableReference @@ -23,6 +25,25 @@ from google.cloud.bigquery.job.base import _JobReference +class OperationType: + """Different operation types supported in table copy job. + + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#operationtype + """ + + OPERATION_TYPE_UNSPECIFIED = "OPERATION_TYPE_UNSPECIFIED" + """Unspecified operation type.""" + + COPY = "COPY" + """The source and destination table have the same table type.""" + + SNAPSHOT = "SNAPSHOT" + """The source table type is TABLE and the destination table type is SNAPSHOT.""" + + RESTORE = "RESTORE" + """The source table type is SNAPSHOT and the destination table type is TABLE.""" + + class CopyJobConfig(_JobConfig): """Configuration options for copy jobs. @@ -85,6 +106,23 @@ def destination_encryption_configuration(self, value): api_repr = value.to_api_repr() self._set_sub_prop("destinationEncryptionConfiguration", api_repr) + @property + def operation_type(self) -> str: + """The operation to perform with this copy job. + + See + https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationTableCopy.FIELDS.operation_type + """ + return self._get_sub_prop( + "operationType", OperationType.OPERATION_TYPE_UNSPECIFIED + ) + + @operation_type.setter + def operation_type(self, value: Optional[str]): + if value is None: + value = OperationType.OPERATION_TYPE_UNSPECIFIED + self._set_sub_prop("operationType", value) + class CopyJob(_AsyncJob): """Asynchronous job: copy data into a table from other tables. diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index a1c13c85d..765110ae6 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -321,6 +321,7 @@ class Table(object): "range_partitioning": "rangePartitioning", "time_partitioning": "timePartitioning", "schema": "schema", + "snapshot_definition": "snapshotDefinition", "streaming_buffer": "streamingBuffer", "self_link": "selfLink", "table_id": ["tableReference", "tableId"], @@ -910,6 +911,19 @@ def external_data_configuration(self, value): self._PROPERTY_TO_API_FIELD["external_data_configuration"] ] = api_repr + @property + def snapshot_definition(self) -> Optional["SnapshotDefinition"]: + """Information about the snapshot. This value is set via snapshot creation. + + See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table.FIELDS.snapshot_definition + """ + snapshot_info = self._properties.get( + self._PROPERTY_TO_API_FIELD["snapshot_definition"] + ) + if snapshot_info is not None: + snapshot_info = SnapshotDefinition(snapshot_info) + return snapshot_info + @classmethod def from_string(cls, full_table_id: str) -> "Table": """Construct a table from fully-qualified table ID. @@ -1274,6 +1288,29 @@ def __init__(self, resource): ) +class SnapshotDefinition: + """Information about base table and snapshot time of the snapshot. + + See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#snapshotdefinition + + Args: + resource: Snapshot definition representation returned from the API. + """ + + def __init__(self, resource: Dict[str, Any]): + self.base_table_reference = None + if "baseTableReference" in resource: + self.base_table_reference = TableReference.from_api_repr( + resource["baseTableReference"] + ) + + self.snapshot_time = None + if "snapshotTime" in resource: + self.snapshot_time = google.cloud._helpers._rfc3339_to_datetime( + resource["snapshotTime"] + ) + + class Row(object): """A BigQuery row. diff --git a/tests/system/test_client.py b/tests/system/test_client.py index 460296b2f..7234333a2 100644 --- a/tests/system/test_client.py +++ b/tests/system/test_client.py @@ -394,7 +394,7 @@ def test_create_table_with_real_custom_policy(self): taxonomy_parent = f"projects/{Config.CLIENT.project}/locations/us" new_taxonomy = datacatalog_types.Taxonomy( - display_name="Custom test taxonomy", + display_name="Custom test taxonomy" + unique_resource_id(), description="This taxonomy is ony used for a test.", activated_policy_types=[ datacatalog_types.Taxonomy.PolicyType.FINE_GRAINED_ACCESS_CONTROL @@ -2370,6 +2370,75 @@ def test_parameterized_types_round_trip(self): self.assertEqual(tuple(s._key()[:2] for s in table2.schema), fields) + def test_table_snapshots(self): + from google.cloud.bigquery import CopyJobConfig + from google.cloud.bigquery import OperationType + + client = Config.CLIENT + + source_table_path = f"{client.project}.{Config.DATASET}.test_table" + snapshot_table_path = f"{source_table_path}_snapshot" + + # Create the table before loading so that the column order is predictable. + schema = [ + bigquery.SchemaField("foo", "INTEGER"), + bigquery.SchemaField("bar", "STRING"), + ] + source_table = helpers.retry_403(Config.CLIENT.create_table)( + Table(source_table_path, schema=schema) + ) + self.to_delete.insert(0, source_table) + + # Populate the table with initial data. + rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}] + load_job = Config.CLIENT.load_table_from_json(rows, source_table) + load_job.result() + + # Now create a snapshot before modifying the original table data. + copy_config = CopyJobConfig() + copy_config.operation_type = OperationType.SNAPSHOT + + copy_job = client.copy_table( + sources=source_table_path, + destination=snapshot_table_path, + job_config=copy_config, + ) + copy_job.result() + + snapshot_table = client.get_table(snapshot_table_path) + self.to_delete.insert(0, snapshot_table) + + # Modify data in original table. + sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")' + query_job = client.query(sql) + query_job.result() + + # List rows from the source table and compare them to rows from the snapshot. + rows_iter = client.list_rows(source_table_path) + rows = sorted(row.values() for row in rows_iter) + assert rows == [(1, "one"), (2, "two"), (3, "three")] + + rows_iter = client.list_rows(snapshot_table_path) + rows = sorted(row.values() for row in rows_iter) + assert rows == [(1, "one"), (2, "two")] + + # Now restore the table from the snapshot and it should again contain the old + # set of rows. + copy_config = CopyJobConfig() + copy_config.operation_type = OperationType.RESTORE + copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE + + copy_job = client.copy_table( + sources=snapshot_table_path, + destination=source_table_path, + job_config=copy_config, + ) + copy_job.result() + + rows_iter = client.list_rows(source_table_path) + rows = sorted(row.values() for row in rows_iter) + assert rows == [(1, "one"), (2, "two")] + def temp_dataset(self, dataset_id, location=None): project = Config.CLIENT.project dataset_ref = bigquery.DatasetReference(project, dataset_id) diff --git a/tests/unit/job/test_copy.py b/tests/unit/job/test_copy.py index fb0c87391..992efcf6b 100644 --- a/tests/unit/job/test_copy.py +++ b/tests/unit/job/test_copy.py @@ -28,18 +28,34 @@ def _get_target_class(): return CopyJobConfig + def test_ctor_defaults(self): + from google.cloud.bigquery.job import OperationType + + config = self._make_one() + + assert config.create_disposition is None + assert config.write_disposition is None + assert config.destination_encryption_configuration is None + assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED + def test_ctor_w_properties(self): from google.cloud.bigquery.job import CreateDisposition + from google.cloud.bigquery.job import OperationType from google.cloud.bigquery.job import WriteDisposition create_disposition = CreateDisposition.CREATE_NEVER write_disposition = WriteDisposition.WRITE_TRUNCATE + snapshot_operation = OperationType.SNAPSHOT + config = self._get_target_class()( - create_disposition=create_disposition, write_disposition=write_disposition + create_disposition=create_disposition, + write_disposition=write_disposition, + operation_type=snapshot_operation, ) self.assertEqual(config.create_disposition, create_disposition) self.assertEqual(config.write_disposition, write_disposition) + self.assertEqual(config.operation_type, snapshot_operation) def test_to_api_repr_with_encryption(self): from google.cloud.bigquery.encryption_configuration import ( @@ -70,6 +86,22 @@ def test_to_api_repr_with_encryption_none(self): resource, {"copy": {"destinationEncryptionConfiguration": None}} ) + def test_operation_type_setting_none(self): + from google.cloud.bigquery.job import OperationType + + config = self._make_one(operation_type=OperationType.SNAPSHOT) + + # Setting it to None is the same as setting it to OPERATION_TYPE_UNSPECIFIED. + config.operation_type = None + assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED + + def test_operation_type_setting_non_none(self): + from google.cloud.bigquery.job import OperationType + + config = self._make_one(operation_type=None) + config.operation_type = OperationType.RESTORE + assert config.operation_type == OperationType.RESTORE + class TestCopyJob(_Base): JOB_TYPE = "copy" diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index f4038835c..b30f16fe0 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -684,6 +684,40 @@ def test_props_set_by_server(self): self.assertEqual(table.full_table_id, TABLE_FULL_ID) self.assertEqual(table.table_type, "TABLE") + def test_snapshot_definition_not_set(self): + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + + assert table.snapshot_definition is None + + def test_snapshot_definition_set(self): + from google.cloud._helpers import UTC + from google.cloud.bigquery.table import SnapshotDefinition + + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + table = self._make_one(table_ref) + + table._properties["snapshotDefinition"] = { + "baseTableReference": { + "projectId": "project_x", + "datasetId": "dataset_y", + "tableId": "table_z", + }, + "snapshotTime": "2010-09-28T10:20:30.123Z", + } + + snapshot = table.snapshot_definition + + assert isinstance(snapshot, SnapshotDefinition) + assert snapshot.base_table_reference.path == ( + "/projects/project_x/datasets/dataset_y/tables/table_z" + ) + assert snapshot.snapshot_time == datetime.datetime( + 2010, 9, 28, 10, 20, 30, 123000, tzinfo=UTC + ) + def test_description_setter_bad_value(self): dataset = DatasetReference(self.PROJECT, self.DS_ID) table_ref = dataset.table(self.TABLE_NAME) @@ -1509,6 +1543,46 @@ def test_to_api_repr(self): self.assertEqual(table.to_api_repr(), resource) +class TestSnapshotDefinition: + @staticmethod + def _get_target_class(): + from google.cloud.bigquery.table import SnapshotDefinition + + return SnapshotDefinition + + @classmethod + def _make_one(cls, *args, **kwargs): + klass = cls._get_target_class() + return klass(*args, **kwargs) + + def test_ctor_empty_resource(self): + instance = self._make_one(resource={}) + assert instance.base_table_reference is None + assert instance.snapshot_time is None + + def test_ctor_full_resource(self): + from google.cloud._helpers import UTC + from google.cloud.bigquery.table import TableReference + + resource = { + "baseTableReference": { + "projectId": "my-project", + "datasetId": "your-dataset", + "tableId": "our-table", + }, + "snapshotTime": "2005-06-07T19:35:02.123Z", + } + instance = self._make_one(resource) + + expected_table_ref = TableReference.from_string( + "my-project.your-dataset.our-table" + ) + assert instance.base_table_reference == expected_table_ref + + expected_time = datetime.datetime(2005, 6, 7, 19, 35, 2, 123000, tzinfo=UTC) + assert instance.snapshot_time == expected_time + + class TestRow(unittest.TestCase): def test_row(self): from google.cloud.bigquery.table import Row