Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add support for materialized views #408

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
95 changes: 83 additions & 12 deletions google/cloud/bigquery/table.py
Expand Up @@ -293,15 +293,18 @@ class Table(object):
"""

_PROPERTY_TO_API_FIELD = {
"friendly_name": "friendlyName",
"encryption_configuration": "encryptionConfiguration",
"expires": "expirationTime",
"time_partitioning": "timePartitioning",
"partitioning_type": "timePartitioning",
"external_data_configuration": "externalDataConfiguration",
"friendly_name": "friendlyName",
"mview_enable_refresh": "materializedView",
"mview_query": "materializedView",
"mview_refresh_interval": "materializedView",
"partition_expiration": "timePartitioning",
"partitioning_type": "timePartitioning",
"time_partitioning": "timePartitioning",
"view_use_legacy_sql": "view",
"view_query": "view",
"external_data_configuration": "externalDataConfiguration",
"encryption_configuration": "encryptionConfiguration",
"require_partition_filter": "requirePartitionFilter",
}

Expand Down Expand Up @@ -714,18 +717,14 @@ def view_query(self):
Raises:
ValueError: For invalid value types.
"""
view = self._properties.get("view")
if view is not None:
return view.get("query")
return _helpers._get_sub_prop(self._properties, ["view", "query"])

@view_query.setter
def view_query(self, value):
if not isinstance(value, six.string_types):
raise ValueError("Pass a string")
view = self._properties.get("view")
if view is None:
view = self._properties["view"] = {}
view["query"] = value
_helpers._set_sub_prop(self._properties, ["view", "query"], value)
view = self._properties["view"]
# The service defaults useLegacySql to True, but this
# client uses Standard SQL by default.
if view.get("useLegacySql") is None:
Expand All @@ -746,6 +745,78 @@ def view_use_legacy_sql(self, value):
self._properties["view"] = {}
self._properties["view"]["useLegacySql"] = value

@property
def mview_query(self):
"""Optional[str]: SQL query defining the table as a materialized
view (defaults to :data:`None`).
"""
return _helpers._get_sub_prop(self._properties, ["materializedView", "query"])

@mview_query.setter
def mview_query(self, value):
_helpers._set_sub_prop(
self._properties, ["materializedView", "query"], str(value)
)

@mview_query.deleter
def mview_query(self):
"""Delete SQL query defining the table as a materialized view."""
self._properties.pop("materializedView", None)

@property
def mview_last_refresh_time(self):
"""Optional[datetime.datetime]: Datetime at which the materialized view was last
refreshed (:data:`None` until set from the server).
"""
refresh_time = _helpers._get_sub_prop(
self._properties, ["materializedView", "lastRefreshTime"]
)
if refresh_time is not None:
# refresh_time will be in milliseconds.
return google.cloud._helpers._datetime_from_microseconds(
1000 * int(refresh_time)
)

@property
def mview_enable_refresh(self):
"""Optional[bool]: Enable automatic refresh of the materialized view
when the base table is updated. The default value is :data:`True`.
"""
return _helpers._get_sub_prop(
self._properties, ["materializedView", "enableRefresh"]
)

@mview_enable_refresh.setter
def mview_enable_refresh(self, value):
return _helpers._set_sub_prop(
self._properties, ["materializedView", "enableRefresh"], value
)

@property
def mview_refresh_interval(self):
"""Optional[datetime.timedelta]: The maximum frequency at which this
materialized view will be refreshed. The default value is 1800000
milliseconds (30 minutes).
"""
refresh_interval = _helpers._get_sub_prop(
self._properties, ["materializedView", "refreshIntervalMs"]
)
if refresh_interval is not None:
return datetime.timedelta(milliseconds=int(refresh_interval))

@mview_refresh_interval.setter
def mview_refresh_interval(self, value):
if value is None:
refresh_interval_ms = None
else:
refresh_interval_ms = str(value // datetime.timedelta(milliseconds=1))

_helpers._set_sub_prop(
self._properties,
["materializedView", "refreshIntervalMs"],
refresh_interval_ms,
)

@property
def streaming_buffer(self):
"""google.cloud.bigquery.StreamingBuffer: Information about a table's
Expand Down
86 changes: 86 additions & 0 deletions samples/snippets/materialized_view.py
@@ -0,0 +1,86 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


def create_materialized_view(override_values={}):
# [START bigquery_create_materialized_view]
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
base_table_id = "my-project.my_dataset.my_base_table"
# [END bigquery_create_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
base_table_id = override_values.get("base_table_id", view_id)
# [START bigquery_create_materialized_view]
view = bigquery.Table(view_id)
view.mview_query = f"""
SELECT product_id, SUM(clicks) AS sum_clicks
FROM `{base_table_id}`
GROUP BY 1
"""

# Make an API request to create the materialized view.
view = bigquery_client.create_table(view)
print(f"Created {view.table_type}: {str(view.reference)}")
# [END bigquery_create_materialized_view]
return view


def update_materialized_view(override_values={}):
# [START bigquery_update_materialized_view]
import datetime
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
# [END bigquery_update_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
# [START bigquery_update_materialized_view]
view = bigquery.Table(view_id)
view.mview_enable_refresh = True
view.mview_refresh_interval = datetime.timedelta(hours=1)

# Make an API request to update the materialized view.
view = bigquery_client.update_table(
view,
# Pass in a list of any fields you need to modify.
["mview_enable_refresh", "mview_refresh_interval"],
)
print(f"Updated {view.table_type}: {str(view.reference)}")
# [END bigquery_update_materialized_view]
return view


def delete_materialized_view(override_values={}):
# [START bigquery_delete_materialized_view]
from google.cloud import bigquery

bigquery_client = bigquery.Client()

view_id = "my-project.my_dataset.my_materialized_view"
# [END bigquery_delete_materialized_view]
# To facilitate testing, we replace values with alternatives
# provided by the testing harness.
view_id = override_values.get("view_id", view_id)
# [START bigquery_delete_materialized_view]
# Make an API request to delete the materialized view.
bigquery_client.delete_table(view_id)
# [END bigquery_delete_materialized_view]
93 changes: 93 additions & 0 deletions samples/snippets/materialized_view_test.py
@@ -0,0 +1,93 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import datetime
import uuid

from google.api_core import exceptions
from google.cloud import bigquery
import pytest

import materialized_view


def temp_suffix():
return str(uuid.uuid4()).replace("-", "_")


@pytest.fixture(scope="module")
def bigquery_client():
bigquery_client = bigquery.Client()
return bigquery_client


@pytest.fixture(autouse=True)
def bigquery_client_patch(monkeypatch, bigquery_client):
monkeypatch.setattr(bigquery, "Client", lambda: bigquery_client)


@pytest.fixture(scope="module")
def project_id(bigquery_client):
return bigquery_client.project


@pytest.fixture(scope="module")
def dataset_id(bigquery_client):
dataset_id = f"mvdataset_{temp_suffix()}"
bigquery_client.create_dataset(dataset_id)
yield dataset_id
bigquery_client.delete_dataset(dataset_id, delete_contents=True)


@pytest.fixture(scope="module")
def base_table_id(bigquery_client, project_id, dataset_id):
base_table_id = f"{project_id}.{dataset_id}.base_{temp_suffix()}"
# Schema from materialized views guide:
# https://cloud.google.com/bigquery/docs/materialized-views#create
base_table = bigquery.Table(base_table_id)
base_table.schema = [
bigquery.SchemaField("product_id", bigquery.SqlTypeNames.INT64),
bigquery.SchemaField("clicks", bigquery.SqlTypeNames.INT64),
]
bigquery_client.create_table(base_table)
yield base_table_id
bigquery_client.delete_table(base_table_id)


@pytest.fixture(scope="module")
def view_id(bigquery_client, project_id, dataset_id):
view_id = f"{project_id}.{dataset_id}.mview_{temp_suffix()}"
yield view_id
bigquery_client.delete_table(view_id, not_found_ok=True)


def test_materialized_view(capsys, bigquery_client, base_table_id, view_id):
override_values = {
"base_table_id": base_table_id,
"view_id": view_id,
}
view = materialized_view.create_materialized_view(override_values)
assert base_table_id in view.mview_query
out, _ = capsys.readouterr()
assert view_id in out

view = materialized_view.update_materialized_view(override_values)
assert view.mview_enable_refresh
assert view.mview_refresh_interval == datetime.timedelta(hours=1)
out, _ = capsys.readouterr()
assert view_id in out

materialized_view.delete_materialized_view(override_values)
with pytest.raises(exceptions.NotFound):
bigquery_client.get_table(view_id)