Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for table snapshots #740

Merged
merged 7 commits into from Jul 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 4 additions & 0 deletions google/cloud/bigquery/__init__.py
Expand Up @@ -60,6 +60,7 @@
from google.cloud.bigquery.job import ExtractJobConfig
from google.cloud.bigquery.job import LoadJob
from google.cloud.bigquery.job import LoadJobConfig
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import QueryJob
from google.cloud.bigquery.job import QueryJobConfig
from google.cloud.bigquery.job import QueryPriority
Expand All @@ -86,6 +87,7 @@
from google.cloud.bigquery.table import PartitionRange
from google.cloud.bigquery.table import RangePartitioning
from google.cloud.bigquery.table import Row
from google.cloud.bigquery.table import SnapshotDefinition
from google.cloud.bigquery.table import Table
from google.cloud.bigquery.table import TableReference
from google.cloud.bigquery.table import TimePartitioningType
Expand Down Expand Up @@ -114,6 +116,7 @@
"PartitionRange",
"RangePartitioning",
"Row",
"SnapshotDefinition",
"TimePartitioning",
"TimePartitioningType",
# Jobs
Expand Down Expand Up @@ -153,6 +156,7 @@
"ExternalSourceFormat",
"Encoding",
"KeyResultStatementKind",
"OperationType",
"QueryPriority",
"SchemaUpdateOption",
"SourceFormat",
Expand Down
2 changes: 2 additions & 0 deletions google/cloud/bigquery/job/__init__.py
Expand Up @@ -25,6 +25,7 @@
from google.cloud.bigquery.job.base import UnknownJob
from google.cloud.bigquery.job.copy_ import CopyJob
from google.cloud.bigquery.job.copy_ import CopyJobConfig
from google.cloud.bigquery.job.copy_ import OperationType
from google.cloud.bigquery.job.extract import ExtractJob
from google.cloud.bigquery.job.extract import ExtractJobConfig
from google.cloud.bigquery.job.load import LoadJob
Expand Down Expand Up @@ -59,6 +60,7 @@
"UnknownJob",
"CopyJob",
"CopyJobConfig",
"OperationType",
"ExtractJob",
"ExtractJobConfig",
"LoadJob",
Expand Down
38 changes: 38 additions & 0 deletions google/cloud/bigquery/job/copy_.py
Expand Up @@ -14,6 +14,8 @@

"""Classes for copy jobs."""

from typing import Optional

from google.cloud.bigquery.encryption_configuration import EncryptionConfiguration
from google.cloud.bigquery import _helpers
from google.cloud.bigquery.table import TableReference
Expand All @@ -23,6 +25,25 @@
from google.cloud.bigquery.job.base import _JobReference


class OperationType:
"""Different operation types supported in table copy job.

https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#operationtype
"""

OPERATION_TYPE_UNSPECIFIED = "OPERATION_TYPE_UNSPECIFIED"
"""Unspecified operation type."""

COPY = "COPY"
"""The source and destination table have the same table type."""

SNAPSHOT = "SNAPSHOT"
"""The source table type is TABLE and the destination table type is SNAPSHOT."""

RESTORE = "RESTORE"
"""The source table type is SNAPSHOT and the destination table type is TABLE."""


class CopyJobConfig(_JobConfig):
"""Configuration options for copy jobs.

Expand Down Expand Up @@ -85,6 +106,23 @@ def destination_encryption_configuration(self, value):
api_repr = value.to_api_repr()
self._set_sub_prop("destinationEncryptionConfiguration", api_repr)

@property
def operation_type(self) -> str:
"""The operation to perform with this copy job.

See
https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationTableCopy.FIELDS.operation_type
"""
return self._get_sub_prop(
"operationType", OperationType.OPERATION_TYPE_UNSPECIFIED
)

@operation_type.setter
def operation_type(self, value: Optional[str]):
if value is None:
value = OperationType.OPERATION_TYPE_UNSPECIFIED
self._set_sub_prop("operationType", value)


class CopyJob(_AsyncJob):
"""Asynchronous job: copy data into a table from other tables.
Expand Down
38 changes: 38 additions & 0 deletions google/cloud/bigquery/table.py
Expand Up @@ -321,6 +321,7 @@ class Table(object):
"range_partitioning": "rangePartitioning",
"time_partitioning": "timePartitioning",
"schema": "schema",
"snapshot_definition": "snapshotDefinition",
"streaming_buffer": "streamingBuffer",
"self_link": "selfLink",
"table_id": ["tableReference", "tableId"],
Expand Down Expand Up @@ -910,6 +911,19 @@ def external_data_configuration(self, value):
self._PROPERTY_TO_API_FIELD["external_data_configuration"]
] = api_repr

@property
def snapshot_definition(self) -> Optional["SnapshotDefinition"]:
"""Information about the snapshot. This value is set via snapshot creation.

See: https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#Table.FIELDS.snapshot_definition
"""
snapshot_info = self._properties.get(
self._PROPERTY_TO_API_FIELD["snapshot_definition"]
)
if snapshot_info is not None:
snapshot_info = SnapshotDefinition(snapshot_info)
return snapshot_info

@classmethod
def from_string(cls, full_table_id: str) -> "Table":
"""Construct a table from fully-qualified table ID.
Expand Down Expand Up @@ -1274,6 +1288,30 @@ def __init__(self, resource):
)


class SnapshotDefinition:
"""Information about base table and snapshot time of the snapshot.

See https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#snapshotdefinition

Args:
resource: Snapshot definition representation returned from the API.
"""

def __init__(self, resource: Dict[str, Any]):
self.base_table_reference = None
if "baseTableReference" in resource:
self.base_table_reference = TableReference.from_api_repr(
resource["baseTableReference"]
)

self.snapshot_time = None
if "snapshotTime" in resource:
dt = google.cloud._helpers._rfc3339_to_datetime(resource["snapshotTime"])
# The helper returns a timezone *aware* datetime object (with UTC tzinfo),
# we need to make it naive.
plamut marked this conversation as resolved.
Show resolved Hide resolved
self.snapshot_time = dt.replace(tzinfo=None)


class Row(object):
"""A BigQuery row.

Expand Down
71 changes: 70 additions & 1 deletion tests/system/test_client.py
Expand Up @@ -394,7 +394,7 @@ def test_create_table_with_real_custom_policy(self):
taxonomy_parent = f"projects/{Config.CLIENT.project}/locations/us"

new_taxonomy = datacatalog_types.Taxonomy(
display_name="Custom test taxonomy",
display_name=unique_resource_id("Custom test taxonomy"),
plamut marked this conversation as resolved.
Show resolved Hide resolved
description="This taxonomy is ony used for a test.",
activated_policy_types=[
datacatalog_types.Taxonomy.PolicyType.FINE_GRAINED_ACCESS_CONTROL
Expand Down Expand Up @@ -2316,6 +2316,75 @@ def test_parameterized_types_round_trip(self):

self.assertEqual(tuple(s._key()[:2] for s in table2.schema), fields)

def test_table_snapshots(self):
from google.cloud.bigquery import CopyJobConfig
from google.cloud.bigquery import OperationType

client = Config.CLIENT

source_table_path = f"{client.project}.{Config.DATASET}.test_table"
snapshot_table_path = f"{source_table_path}_snapshot"

# Create the table before loading so that the column order is predictable.
schema = [
bigquery.SchemaField("foo", "INTEGER"),
bigquery.SchemaField("bar", "STRING"),
]
source_table = helpers.retry_403(Config.CLIENT.create_table)(
Table(source_table_path, schema=schema)
)
self.to_delete.insert(0, source_table)

# Populate the table with initial data.
rows = [{"foo": 1, "bar": "one"}, {"foo": 2, "bar": "two"}]
load_job = Config.CLIENT.load_table_from_json(rows, source_table)
load_job.result()

# Now create a snapshot before modifying the original table data.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.SNAPSHOT

copy_job = client.copy_table(
sources=source_table_path,
destination=snapshot_table_path,
job_config=copy_config,
)
copy_job.result()

snapshot_table = client.get_table(snapshot_table_path)
self.to_delete.insert(0, snapshot_table)

# Modify data in original table.
sql = f'INSERT INTO `{source_table_path}`(foo, bar) VALUES (3, "three")'
query_job = client.query(sql)
query_job.result()

# List rows from the source table and compare them to rows from the snapshot.
rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two"), (3, "three")]

rows_iter = client.list_rows(snapshot_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

# Now restore the table from the snapshot and it should again contain the old
# set of rows.
copy_config = CopyJobConfig()
copy_config.operation_type = OperationType.RESTORE
copy_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

copy_job = client.copy_table(
sources=snapshot_table_path,
destination=source_table_path,
job_config=copy_config,
)
copy_job.result()

rows_iter = client.list_rows(source_table_path)
rows = sorted(row.values() for row in rows_iter)
assert rows == [(1, "one"), (2, "two")]

def temp_dataset(self, dataset_id, location=None):
project = Config.CLIENT.project
dataset_ref = bigquery.DatasetReference(project, dataset_id)
Expand Down
22 changes: 21 additions & 1 deletion tests/unit/job/test_copy.py
Expand Up @@ -30,16 +30,22 @@ def _get_target_class():

def test_ctor_w_properties(self):
from google.cloud.bigquery.job import CreateDisposition
from google.cloud.bigquery.job import OperationType
from google.cloud.bigquery.job import WriteDisposition

create_disposition = CreateDisposition.CREATE_NEVER
write_disposition = WriteDisposition.WRITE_TRUNCATE
snapshot_operation = OperationType.SNAPSHOT

config = self._get_target_class()(
create_disposition=create_disposition, write_disposition=write_disposition
create_disposition=create_disposition,
write_disposition=write_disposition,
operation_type=snapshot_operation,
)

self.assertEqual(config.create_disposition, create_disposition)
self.assertEqual(config.write_disposition, write_disposition)
self.assertEqual(config.operation_type, snapshot_operation)

def test_to_api_repr_with_encryption(self):
from google.cloud.bigquery.encryption_configuration import (
Expand Down Expand Up @@ -70,6 +76,20 @@ def test_to_api_repr_with_encryption_none(self):
resource, {"copy": {"destinationEncryptionConfiguration": None}}
)

def test_operation_type_unspecified(self):
from google.cloud.bigquery.job import OperationType

config = self._make_one()
self.assertEqual(
config.operation_type, OperationType.OPERATION_TYPE_UNSPECIFIED
)
plamut marked this conversation as resolved.
Show resolved Hide resolved

# Setting it to None is the same as setting it to OPERATION_TYPE_UNSPECIFIED.
config.operation_type = None
self.assertEqual(
config.operation_type, OperationType.OPERATION_TYPE_UNSPECIFIED
)
plamut marked this conversation as resolved.
Show resolved Hide resolved


class TestCopyJob(_Base):
JOB_TYPE = "copy"
Expand Down
72 changes: 72 additions & 0 deletions tests/unit/test_table.py
Expand Up @@ -684,6 +684,39 @@ def test_props_set_by_server(self):
self.assertEqual(table.full_table_id, TABLE_FULL_ID)
self.assertEqual(table.table_type, "TABLE")

def test_snapshot_definition_not_set(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

assert table.snapshot_definition is None

def test_snapshot_definition_set(self):
from google.cloud.bigquery.table import SnapshotDefinition

dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
table = self._make_one(table_ref)

table._properties["snapshotDefinition"] = {
"baseTableReference": {
"projectId": "project_x",
"datasetId": "dataset_y",
"tableId": "table_z",
},
"snapshotTime": "2010-09-28T10:20:30.123Z",
}

snapshot = table.snapshot_definition

assert isinstance(snapshot, SnapshotDefinition)
assert snapshot.base_table_reference.path == (
"/projects/project_x/datasets/dataset_y/tables/table_z"
)
assert snapshot.snapshot_time == datetime.datetime(
2010, 9, 28, 10, 20, 30, 123000
)

def test_description_setter_bad_value(self):
dataset = DatasetReference(self.PROJECT, self.DS_ID)
table_ref = dataset.table(self.TABLE_NAME)
Expand Down Expand Up @@ -1509,6 +1542,45 @@ def test_to_api_repr(self):
self.assertEqual(table.to_api_repr(), resource)


class TestSnapshotDefinition:
@staticmethod
def _get_target_class():
from google.cloud.bigquery.table import SnapshotDefinition

return SnapshotDefinition

@classmethod
def _make_one(cls, *args, **kwargs):
klass = cls._get_target_class()
return klass(*args, **kwargs)

def test_ctor_empty_resource(self):
instance = self._make_one(resource={})
assert instance.base_table_reference is None
assert instance.snapshot_time is None

def test_ctor_full_resource(self):
from google.cloud.bigquery.table import TableReference

resource = {
"baseTableReference": {
"projectId": "my-project",
"datasetId": "your-dataset",
"tableId": "our-table",
},
"snapshotTime": "2005-06-07T19:35:02.123Z",
}
instance = self._make_one(resource)

expected_table_ref = TableReference.from_string(
"my-project.your-dataset.our-table"
)
assert instance.base_table_reference == expected_table_ref

expected_time = datetime.datetime(2005, 6, 7, 19, 35, 2, 123000)
assert instance.snapshot_time == expected_time


class TestRow(unittest.TestCase):
def test_row(self):
from google.cloud.bigquery.table import Row
Expand Down