Skip to content

Commit

Permalink
feat: add support for table snapshots (#740)
Browse files Browse the repository at this point in the history
* feat: add support for table snapshots

* Add system test for table snapshots

* Make test taxonomy resource name unique

* Store timezone aware snapshot time on snapshots

* Make copy config tests more detailed

* Use unique resource ID differently for display name

* Add new classes to docs
  • Loading branch information
plamut committed Jul 10, 2021
1 parent 7d2d3e9 commit ba86b2a
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/reference.rst
Expand Up @@ -59,6 +59,7 @@ Job-Related Types
job.CreateDisposition
job.DestinationFormat
job.Encoding
job.OperationType
job.QueryPlanEntry
job.QueryPlanEntryStep
job.QueryPriority
Expand Down Expand Up @@ -90,6 +91,7 @@ Table
table.RangePartitioning
table.Row
table.RowIterator
table.SnapshotDefinition
table.Table
table.TableListItem
table.TableReference
Expand Down
4 changes: 4 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -61,6 +61,7 @@
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.job import QueryPriority
Expand All @@ -87,6 +88,7 @@
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import Row
from google.cloud.bigquery.table import SnapshotDefinition
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioningType
Expand Down Expand Up @@ -115,6 +117,7 @@
"PartitionRange",
"RangePartitioning",
"Row",
"SnapshotDefinition",
"TimePartitioning",
"TimePartitioningType",
# Jobs
Expand Down Expand Up @@ -155,6 +158,7 @@
"ExternalSourceFormat",
"Encoding",
"KeyResultStatementKind",
"OperationType",
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -25,6 +25,7 @@
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
from google.cloud.bigquery.job.copy_ import OperationType
from google.cloud.bigquery.job.extract import ExtractJob
from google.cloud.bigquery.job.extract import ExtractJobConfig
from google.cloud.bigquery.job.load import LoadJob
Expand Down Expand Up @@ -59,6 +60,7 @@
"UnknownJob",
"CopyJob",
"CopyJobConfig",
"OperationType",
"ExtractJob",
"ExtractJobConfig",
"LoadJob",
Expand Down
38 changes: 38 additions & 0 deletions google/cloud/bigquery/job/copy_.py
Expand Up @@ -14,6 +14,8 @@

"""Classes for copy jobs."""

from typing import Optional

from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.table import TableReference
Expand All @@ -23,6 +25,25 @@
from google.cloud.bigquery.job.base import _JobReference


class OperationType:
"""Different operation types supported in table copy job.
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#operationtype
"""

OPERATION_TYPE_UNSPECIFIED = "OPERATION_TYPE_UNSPECIFIED"
"""Unspecified operation type."""

COPY = "COPY"
"""The source and destination table have the same table type."""

SNAPSHOT = "SNAPSHOT"
"""The source table type is TABLE and the destination table type is SNAPSHOT."""

RESTORE = "RESTORE"
"""The source table type is SNAPSHOT and the destination table type is TABLE."""


class CopyJobConfig(_JobConfig):
"""Configuration options for copy jobs.
Expand Down Expand Up @@ -85,6 +106,23 @@ def destination_encryption_configuration(self, value):
api_repr = value.to_api_repr()
self._set_sub_prop("destinationEncryptionConfiguration", api_repr)

@property
def operation_type(self) -> str:
"""The operation to perform with this copy job.
See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationTableCopy.FIELDS.operation_type
"""
return self._get_sub_prop(
"operationType", OperationType.OPERATION_TYPE_UNSPECIFIED
)

@operation_type.setter
def operation_type(self, value: Optional[str]):
if value is None:
value = OperationType.OPERATION_TYPE_UNSPECIFIED
self._set_sub_prop("operationType", value)


class CopyJob(_AsyncJob):
"""Asynchronous job: copy data into a table from other tables.
Expand Down
37 changes: 37 additions & 0 deletions google/cloud/bigquery/table.py
Expand Up @@ -321,6 +321,7 @@ class Table(object):
"range_partitioning": "rangePartitioning",
"time_partitioning": "timePartitioning",
"schema": "schema",
"snapshot_definition": "snapshotDefinition",
"streaming_buffer": "streamingBuffer",
"self_link": "selfLink",
"table_id": ["tableReference", "tableId"],
Expand Down Expand Up @@ -910,6 +911,19 @@ def external_data_configuration(self, value):
self._PROPERTY_TO_API_FIELD["external_data_configuration"]
] = api_repr

@property
def snapshot_definition(self) -> Optional["SnapshotDefinition"]:
"""Information about the snapshot. This value is set via snapshot creation.
See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table.FIELDS.snapshot_definition
"""
snapshot_info = self._properties.get(
self._PROPERTY_TO_API_FIELD["snapshot_definition"]
)
if snapshot_info is not None:
snapshot_info = SnapshotDefinition(snapshot_info)
return snapshot_info

@classmethod
def from_string(cls, full_table_id: str) -> "Table":
"""Construct a table from fully-qualified table ID.
Expand Down Expand Up @@ -1274,6 +1288,29 @@ def __init__(self, resource):
)


class SnapshotDefinition:
"""Information about base table and snapshot time of the snapshot.
See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#snapshotdefinition
Args:
resource: Snapshot definition representation returned from the API.
"""

def __init__(self, resource: Dict[str, Any]):
self.base_table_reference = None
if "baseTableReference" in resource:
self.base_table_reference = TableReference.from_api_repr(
resource["baseTableReference"]
)

self.snapshot_time = None
if "snapshotTime" in resource:
self.snapshot_time = google.cloud._helpers._rfc3339_to_datetime(
resource["snapshotTime"]
)


class Row(object):
"""A BigQuery row.
Expand Down
71 changes: 70 additions & 1 deletion tests/system/test_client.py
Expand Up @@ -394,7 +394,7 @@ def test_create_table_with_real_custom_policy(self):
taxonomy_parent = f"projects/{Config.CLIENT.project}/locations/us"

new_taxonomy = datacatalog_types.Taxonomy(
display_name="Custom test taxonomy",
display_name="Custom test taxonomy" + unique_resource_id(),
description="This taxonomy is ony used for a test.",
activated_policy_types=[
datacatalog_types.Taxonomy.PolicyType.FINE_GRAINED_ACCESS_CONTROL
Expand Down Expand Up @@ -2370,6 +2370,75 @@ def test_parameterized_types_round_trip(self):

self.assertEqual(tuple(s._key()[:2] for s in table2.schema), fields)

def test_table_snapshots(self):
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery import OperationType

client = Config.CLIENT

source_table_path = f"{client.project}.{Config.DATASET}.test_table"
snapshot_table_path = f"{source_table_path}_snapshot"

# Create the table before loading so that the column order is predictable.
schema = [
bigquery.SchemaField("foo", "INTEGER"),
bigquery.SchemaField("bar", "STRING"),
]
source_table = helpers.retry_403(Config.CLIENT.create_table)(
Table(source_table_path, schema=schema)
)
self.to_delete.insert(0, source_table)

# Populate the table with initial data.
rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}]
load_job = Config.CLIENT.load_table_from_json(rows, source_table)
load_job.result()

# Now create a snapshot before modifying the original table data.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.SNAPSHOT

copy_job = client.copy_table(
sources=source_table_path,
destination=snapshot_table_path,
job_config=copy_config,
)
copy_job.result()

snapshot_table = client.get_table(snapshot_table_path)
self.to_delete.insert(0, snapshot_table)

# Modify data in original table.
sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")'
query_job = client.query(sql)
query_job.result()

# List rows from the source table and compare them to rows from the snapshot.
rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two"), (3, "three")]

rows_iter = client.list_rows(snapshot_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

# Now restore the table from the snapshot and it should again contain the old
# set of rows.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.RESTORE
copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

copy_job = client.copy_table(
sources=snapshot_table_path,
destination=source_table_path,
job_config=copy_config,
)
copy_job.result()

rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

def temp_dataset(self, dataset_id, location=None):
project = Config.CLIENT.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
Expand Down
34 changes: 33 additions & 1 deletion tests/unit/job/test_copy.py
Expand Up @@ -28,18 +28,34 @@ def _get_target_class():

return CopyJobConfig

def test_ctor_defaults(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one()

assert config.create_disposition is None
assert config.write_disposition is None
assert config.destination_encryption_configuration is None
assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED

def test_ctor_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import WriteDisposition

create_disposition = CreateDisposition.CREATE_NEVER
write_disposition = WriteDisposition.WRITE_TRUNCATE
snapshot_operation = OperationType.SNAPSHOT

config = self._get_target_class()(
create_disposition=create_disposition, write_disposition=write_disposition
create_disposition=create_disposition,
write_disposition=write_disposition,
operation_type=snapshot_operation,
)

self.assertEqual(config.create_disposition, create_disposition)
self.assertEqual(config.write_disposition, write_disposition)
self.assertEqual(config.operation_type, snapshot_operation)

def test_to_api_repr_with_encryption(self):
from google.cloud.bigquery.encryption_configuration import (
Expand Down Expand Up @@ -70,6 +86,22 @@ def test_to_api_repr_with_encryption_none(self):
resource, {"copy": {"destinationEncryptionConfiguration": None}}
)

def test_operation_type_setting_none(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one(operation_type=OperationType.SNAPSHOT)

# Setting it to None is the same as setting it to OPERATION_TYPE_UNSPECIFIED.
config.operation_type = None
assert config.operation_type == OperationType.OPERATION_TYPE_UNSPECIFIED

def test_operation_type_setting_non_none(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one(operation_type=None)
config.operation_type = OperationType.RESTORE
assert config.operation_type == OperationType.RESTORE


class TestCopyJob(_Base):
JOB_TYPE = "copy"
Expand Down

0 comments on commit ba86b2a

Please sign in to comment.