Skip to content

Commit

Permalink
feat: add support for materialized views (#408)
Browse files Browse the repository at this point in the history
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)

Fixes #407 🦕
  • Loading branch information
tswast committed Dec 3, 2020
1 parent c384b45 commit 57ffc66
Show file tree
Hide file tree
Showing 4 changed files with 317 additions and 18 deletions.
95 changes: 83 additions & 12 deletions google/cloud/bigquery/table.py
Expand Up @@ -293,15 +293,18 @@ class Table(object):
"""

_PROPERTY_TO_API_FIELD = {
"friendly_name": "friendlyName",
"encryption_configuration": "encryptionConfiguration",
"expires": "expirationTime",
"time_partitioning": "timePartitioning",
"partitioning_type": "timePartitioning",
"external_data_configuration": "externalDataConfiguration",
"friendly_name": "friendlyName",
"mview_enable_refresh": "materializedView",
"mview_query": "materializedView",
"mview_refresh_interval": "materializedView",
"partition_expiration": "timePartitioning",
"partitioning_type": "timePartitioning",
"time_partitioning": "timePartitioning",
"view_use_legacy_sql": "view",
"view_query": "view",
"external_data_configuration": "externalDataConfiguration",
"encryption_configuration": "encryptionConfiguration",
"require_partition_filter": "requirePartitionFilter",
}

Expand Down Expand Up @@ -714,18 +717,14 @@ def view_query(self):
Raises:
ValueError: For invalid value types.
"""
view = self._properties.get("view")
if view is not None:
return view.get("query")
return _helpers._get_sub_prop(self._properties, ["view", "query"])

@view_query.setter
def view_query(self, value):
if not isinstance(value, six.string_types):
raise ValueError("Pass a string")
view = self._properties.get("view")
if view is None:
view = self._properties["view"] = {}
view["query"] = value
_helpers._set_sub_prop(self._properties, ["view", "query"], value)
view = self._properties["view"]
# The service defaults useLegacySql to True, but this
# client uses Standard SQL by default.
if view.get("useLegacySql") is None:
Expand All @@ -746,6 +745,78 @@ def view_use_legacy_sql(self, value):
self._properties["view"] = {}
self._properties["view"]["useLegacySql"] = value

@property
def mview_query(self):
"""Optional[str]: SQL query defining the table as a materialized
view (defaults to :data:`None`).
"""
return _helpers._get_sub_prop(self._properties, ["materializedView", "query"])

@mview_query.setter
def mview_query(self, value):
_helpers._set_sub_prop(
self._properties, ["materializedView", "query"], str(value)
)

@mview_query.deleter
def mview_query(self):
"""Delete SQL query defining the table as a materialized view."""
self._properties.pop("materializedView", None)

@property
def mview_last_refresh_time(self):
"""Optional[datetime.datetime]: Datetime at which the materialized view was last
refreshed (:data:`None` until set from the server).
"""
refresh_time = _helpers._get_sub_prop(
self._properties, ["materializedView", "lastRefreshTime"]
)
if refresh_time is not None:
# refresh_time will be in milliseconds.
return google.cloud._helpers._datetime_from_microseconds(
1000 * int(refresh_time)
)

@property
def mview_enable_refresh(self):
"""Optional[bool]: Enable automatic refresh of the materialized view
when the base table is updated. The default value is :data:`True`.
"""
return _helpers._get_sub_prop(
self._properties, ["materializedView", "enableRefresh"]
)

@mview_enable_refresh.setter
def mview_enable_refresh(self, value):
return _helpers._set_sub_prop(
self._properties, ["materializedView", "enableRefresh"], value
)

@property
def mview_refresh_interval(self):
"""Optional[datetime.timedelta]: The maximum frequency at which this
materialized view will be refreshed. The default value is 1800000
milliseconds (30 minutes).
"""
refresh_interval = _helpers._get_sub_prop(
self._properties, ["materializedView", "refreshIntervalMs"]
)
if refresh_interval is not None:
return datetime.timedelta(milliseconds=int(refresh_interval))

@mview_refresh_interval.setter
def mview_refresh_interval(self, value):
if value is None:
refresh_interval_ms = None
else:
refresh_interval_ms = str(value // datetime.timedelta(milliseconds=1))

_helpers._set_sub_prop(
self._properties,
["materializedView", "refreshIntervalMs"],
refresh_interval_ms,
)

@property
def streaming_buffer(self):
"""google.cloud.bigquery.StreamingBuffer: Information about a table's
Expand Down
86 changes: 86 additions & 0 deletions samples/snippets/materialized_view.py
@@ -0,0 +1,86 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def create_materialized_view(override_values={}):
# [START bigquery_create_materialized_view]
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
base_table_id = "my-project.my_dataset.my_base_table"
# [END bigquery_create_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
base_table_id = override_values.get("base_table_id", view_id)
# [START bigquery_create_materialized_view]
view = bigquery.Table(view_id)
view.mview_query = f"""
SELECT product_id, SUM(clicks) AS sum_clicks
FROM `{base_table_id}`
GROUP BY 1
"""

# Make an API request to create the materialized view.
view = bigquery_client.create_table(view)
print(f"Created {view.table_type}: {str(view.reference)}")
# [END bigquery_create_materialized_view]
return view


def update_materialized_view(override_values={}):
# [START bigquery_update_materialized_view]
import datetime
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
# [END bigquery_update_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
# [START bigquery_update_materialized_view]
view = bigquery.Table(view_id)
view.mview_enable_refresh = True
view.mview_refresh_interval = datetime.timedelta(hours=1)

# Make an API request to update the materialized view.
view = bigquery_client.update_table(
view,
# Pass in a list of any fields you need to modify.
["mview_enable_refresh", "mview_refresh_interval"],
)
print(f"Updated {view.table_type}: {str(view.reference)}")
# [END bigquery_update_materialized_view]
return view


def delete_materialized_view(override_values={}):
# [START bigquery_delete_materialized_view]
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
# [END bigquery_delete_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
# [START bigquery_delete_materialized_view]
# Make an API request to delete the materialized view.
bigquery_client.delete_table(view_id)
# [END bigquery_delete_materialized_view]
93 changes: 93 additions & 0 deletions samples/snippets/materialized_view_test.py
@@ -0,0 +1,93 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import uuid

from google.api_core import exceptions
from google.cloud import bigquery
import pytest

import materialized_view


def temp_suffix():
return str(uuid.uuid4()).replace("-", "_")


@pytest.fixture(scope="module")
def bigquery_client():
bigquery_client = bigquery.Client()
return bigquery_client


@pytest.fixture(autouse=True)
def bigquery_client_patch(monkeypatch, bigquery_client):
monkeypatch.setattr(bigquery, "Client", lambda: bigquery_client)


@pytest.fixture(scope="module")
def project_id(bigquery_client):
return bigquery_client.project


@pytest.fixture(scope="module")
def dataset_id(bigquery_client):
dataset_id = f"mvdataset_{temp_suffix()}"
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture(scope="module")
def base_table_id(bigquery_client, project_id, dataset_id):
base_table_id = f"{project_id}.{dataset_id}.base_{temp_suffix()}"
# Schema from materialized views guide:
# https://cloud.google.com/bigquery/docs/materialized-views#create
base_table = bigquery.Table(base_table_id)
base_table.schema = [
bigquery.SchemaField("product_id", bigquery.SqlTypeNames.INT64),
bigquery.SchemaField("clicks", bigquery.SqlTypeNames.INT64),
]
bigquery_client.create_table(base_table)
yield base_table_id
bigquery_client.delete_table(base_table_id)


@pytest.fixture(scope="module")
def view_id(bigquery_client, project_id, dataset_id):
view_id = f"{project_id}.{dataset_id}.mview_{temp_suffix()}"
yield view_id
bigquery_client.delete_table(view_id, not_found_ok=True)


def test_materialized_view(capsys, bigquery_client, base_table_id, view_id):
override_values = {
"base_table_id": base_table_id,
"view_id": view_id,
}
view = materialized_view.create_materialized_view(override_values)
assert base_table_id in view.mview_query
out, _ = capsys.readouterr()
assert view_id in out

view = materialized_view.update_materialized_view(override_values)
assert view.mview_enable_refresh
assert view.mview_refresh_interval == datetime.timedelta(hours=1)
out, _ = capsys.readouterr()
assert view_id in out

materialized_view.delete_materialized_view(override_values)
with pytest.raises(exceptions.NotFound):
bigquery_client.get_table(view_id)

0 comments on commit 57ffc66

Please sign in to comment.