From 57ffc665319331e0a00583d5d652fd14a510cf2a Mon Sep 17 00:00:00 2001 From: Tim Swast Date: Thu, 3 Dec 2020 09:32:02 -0600 Subject: [PATCH] feat: add support for materialized views (#408) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [x] Ensure the tests and linter pass - [x] Code coverage does not decrease (if any source code was changed) - [x] Appropriate docs were updated (if necessary) Fixes #407 🦕 --- google/cloud/bigquery/table.py | 95 +++++++++++++++++++--- samples/snippets/materialized_view.py | 86 ++++++++++++++++++++ samples/snippets/materialized_view_test.py | 93 +++++++++++++++++++++ tests/unit/test_table.py | 61 ++++++++++++-- 4 files changed, 317 insertions(+), 18 deletions(-) create mode 100644 samples/snippets/materialized_view.py create mode 100644 samples/snippets/materialized_view_test.py diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index f30c05773..6daccf518 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -293,15 +293,18 @@ class Table(object): """ _PROPERTY_TO_API_FIELD = { - "friendly_name": "friendlyName", + "encryption_configuration": "encryptionConfiguration", "expires": "expirationTime", - "time_partitioning": "timePartitioning", - "partitioning_type": "timePartitioning", + "external_data_configuration": "externalDataConfiguration", + "friendly_name": "friendlyName", + "mview_enable_refresh": "materializedView", + "mview_query": "materializedView", + "mview_refresh_interval": "materializedView", "partition_expiration": "timePartitioning", + "partitioning_type": "timePartitioning", + "time_partitioning": "timePartitioning", "view_use_legacy_sql": "view", "view_query": "view", - "external_data_configuration": "externalDataConfiguration", - "encryption_configuration": "encryptionConfiguration", "require_partition_filter": "requirePartitionFilter", } @@ -714,18 +717,14 @@ def view_query(self): Raises: ValueError: For invalid value types. """ - view = self._properties.get("view") - if view is not None: - return view.get("query") + return _helpers._get_sub_prop(self._properties, ["view", "query"]) @view_query.setter def view_query(self, value): if not isinstance(value, six.string_types): raise ValueError("Pass a string") - view = self._properties.get("view") - if view is None: - view = self._properties["view"] = {} - view["query"] = value + _helpers._set_sub_prop(self._properties, ["view", "query"], value) + view = self._properties["view"] # The service defaults useLegacySql to True, but this # client uses Standard SQL by default. if view.get("useLegacySql") is None: @@ -746,6 +745,78 @@ def view_use_legacy_sql(self, value): self._properties["view"] = {} self._properties["view"]["useLegacySql"] = value + @property + def mview_query(self): + """Optional[str]: SQL query defining the table as a materialized + view (defaults to :data:`None`). + """ + return _helpers._get_sub_prop(self._properties, ["materializedView", "query"]) + + @mview_query.setter + def mview_query(self, value): + _helpers._set_sub_prop( + self._properties, ["materializedView", "query"], str(value) + ) + + @mview_query.deleter + def mview_query(self): + """Delete SQL query defining the table as a materialized view.""" + self._properties.pop("materializedView", None) + + @property + def mview_last_refresh_time(self): + """Optional[datetime.datetime]: Datetime at which the materialized view was last + refreshed (:data:`None` until set from the server). + """ + refresh_time = _helpers._get_sub_prop( + self._properties, ["materializedView", "lastRefreshTime"] + ) + if refresh_time is not None: + # refresh_time will be in milliseconds. + return google.cloud._helpers._datetime_from_microseconds( + 1000 * int(refresh_time) + ) + + @property + def mview_enable_refresh(self): + """Optional[bool]: Enable automatic refresh of the materialized view + when the base table is updated. The default value is :data:`True`. + """ + return _helpers._get_sub_prop( + self._properties, ["materializedView", "enableRefresh"] + ) + + @mview_enable_refresh.setter + def mview_enable_refresh(self, value): + return _helpers._set_sub_prop( + self._properties, ["materializedView", "enableRefresh"], value + ) + + @property + def mview_refresh_interval(self): + """Optional[datetime.timedelta]: The maximum frequency at which this + materialized view will be refreshed. The default value is 1800000 + milliseconds (30 minutes). + """ + refresh_interval = _helpers._get_sub_prop( + self._properties, ["materializedView", "refreshIntervalMs"] + ) + if refresh_interval is not None: + return datetime.timedelta(milliseconds=int(refresh_interval)) + + @mview_refresh_interval.setter + def mview_refresh_interval(self, value): + if value is None: + refresh_interval_ms = None + else: + refresh_interval_ms = str(value // datetime.timedelta(milliseconds=1)) + + _helpers._set_sub_prop( + self._properties, + ["materializedView", "refreshIntervalMs"], + refresh_interval_ms, + ) + @property def streaming_buffer(self): """google.cloud.bigquery.StreamingBuffer: Information about a table's diff --git a/samples/snippets/materialized_view.py b/samples/snippets/materialized_view.py new file mode 100644 index 000000000..d925ec230 --- /dev/null +++ b/samples/snippets/materialized_view.py @@ -0,0 +1,86 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +def create_materialized_view(override_values={}): + # [START bigquery_create_materialized_view] + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + + view_id = "my-project.my_dataset.my_materialized_view" + base_table_id = "my-project.my_dataset.my_base_table" + # [END bigquery_create_materialized_view] + # To facilitate testing, we replace values with alternatives + # provided by the testing harness. + view_id = override_values.get("view_id", view_id) + base_table_id = override_values.get("base_table_id", view_id) + # [START bigquery_create_materialized_view] + view = bigquery.Table(view_id) + view.mview_query = f""" + SELECT product_id, SUM(clicks) AS sum_clicks + FROM `{base_table_id}` + GROUP BY 1 + """ + + # Make an API request to create the materialized view. + view = bigquery_client.create_table(view) + print(f"Created {view.table_type}: {str(view.reference)}") + # [END bigquery_create_materialized_view] + return view + + +def update_materialized_view(override_values={}): + # [START bigquery_update_materialized_view] + import datetime + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + + view_id = "my-project.my_dataset.my_materialized_view" + # [END bigquery_update_materialized_view] + # To facilitate testing, we replace values with alternatives + # provided by the testing harness. + view_id = override_values.get("view_id", view_id) + # [START bigquery_update_materialized_view] + view = bigquery.Table(view_id) + view.mview_enable_refresh = True + view.mview_refresh_interval = datetime.timedelta(hours=1) + + # Make an API request to update the materialized view. + view = bigquery_client.update_table( + view, + # Pass in a list of any fields you need to modify. + ["mview_enable_refresh", "mview_refresh_interval"], + ) + print(f"Updated {view.table_type}: {str(view.reference)}") + # [END bigquery_update_materialized_view] + return view + + +def delete_materialized_view(override_values={}): + # [START bigquery_delete_materialized_view] + from google.cloud import bigquery + + bigquery_client = bigquery.Client() + + view_id = "my-project.my_dataset.my_materialized_view" + # [END bigquery_delete_materialized_view] + # To facilitate testing, we replace values with alternatives + # provided by the testing harness. + view_id = override_values.get("view_id", view_id) + # [START bigquery_delete_materialized_view] + # Make an API request to delete the materialized view. + bigquery_client.delete_table(view_id) + # [END bigquery_delete_materialized_view] diff --git a/samples/snippets/materialized_view_test.py b/samples/snippets/materialized_view_test.py new file mode 100644 index 000000000..fc3db533c --- /dev/null +++ b/samples/snippets/materialized_view_test.py @@ -0,0 +1,93 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import uuid + +from google.api_core import exceptions +from google.cloud import bigquery +import pytest + +import materialized_view + + +def temp_suffix(): + return str(uuid.uuid4()).replace("-", "_") + + +@pytest.fixture(scope="module") +def bigquery_client(): + bigquery_client = bigquery.Client() + return bigquery_client + + +@pytest.fixture(autouse=True) +def bigquery_client_patch(monkeypatch, bigquery_client): + monkeypatch.setattr(bigquery, "Client", lambda: bigquery_client) + + +@pytest.fixture(scope="module") +def project_id(bigquery_client): + return bigquery_client.project + + +@pytest.fixture(scope="module") +def dataset_id(bigquery_client): + dataset_id = f"mvdataset_{temp_suffix()}" + bigquery_client.create_dataset(dataset_id) + yield dataset_id + bigquery_client.delete_dataset(dataset_id, delete_contents=True) + + +@pytest.fixture(scope="module") +def base_table_id(bigquery_client, project_id, dataset_id): + base_table_id = f"{project_id}.{dataset_id}.base_{temp_suffix()}" + # Schema from materialized views guide: + # https://cloud.google.com/bigquery/docs/materialized-views#create + base_table = bigquery.Table(base_table_id) + base_table.schema = [ + bigquery.SchemaField("product_id", bigquery.SqlTypeNames.INT64), + bigquery.SchemaField("clicks", bigquery.SqlTypeNames.INT64), + ] + bigquery_client.create_table(base_table) + yield base_table_id + bigquery_client.delete_table(base_table_id) + + +@pytest.fixture(scope="module") +def view_id(bigquery_client, project_id, dataset_id): + view_id = f"{project_id}.{dataset_id}.mview_{temp_suffix()}" + yield view_id + bigquery_client.delete_table(view_id, not_found_ok=True) + + +def test_materialized_view(capsys, bigquery_client, base_table_id, view_id): + override_values = { + "base_table_id": base_table_id, + "view_id": view_id, + } + view = materialized_view.create_materialized_view(override_values) + assert base_table_id in view.mview_query + out, _ = capsys.readouterr() + assert view_id in out + + view = materialized_view.update_materialized_view(override_values) + assert view.mview_enable_refresh + assert view.mview_refresh_interval == datetime.timedelta(hours=1) + out, _ = capsys.readouterr() + assert view_id in out + + materialized_view.delete_materialized_view(override_values) + with pytest.raises(exceptions.NotFound): + bigquery_client.get_table(view_id) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 67874ff91..c1876adaa 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -import datetime as dt +import datetime import logging import time import unittest @@ -21,6 +21,7 @@ import mock import pkg_resources import pytest +import pytz import six import google.api_core.exceptions @@ -292,6 +293,13 @@ def _get_target_class(): return Table def _make_one(self, *args, **kw): + from google.cloud.bigquery.dataset import DatasetReference + + if len(args) == 0: + dataset = DatasetReference(self.PROJECT, self.DS_ID) + table_ref = dataset.table(self.TABLE_NAME) + args = (table_ref,) + return self._get_target_class()(*args, **kw) def _setUpConstants(self): @@ -812,6 +820,48 @@ def test_labels_setter_bad_value(self): with self.assertRaises(ValueError): table.labels = 12345 + def test_mview_query(self): + table = self._make_one() + self.assertIsNone(table.mview_query) + table.mview_query = "SELECT name, SUM(number) FROM dset.tbl GROUP BY 1" + self.assertEqual( + table.mview_query, "SELECT name, SUM(number) FROM dset.tbl GROUP BY 1" + ) + del table.mview_query + self.assertIsNone(table.mview_query) + + def test_mview_last_refresh_time(self): + table = self._make_one() + self.assertIsNone(table.mview_last_refresh_time) + table._properties["materializedView"] = { + "lastRefreshTime": "1606751842496", + } + self.assertEqual( + table.mview_last_refresh_time, + datetime.datetime(2020, 11, 30, 15, 57, 22, 496000, tzinfo=pytz.utc), + ) + + def test_mview_enable_refresh(self): + table = self._make_one() + self.assertIsNone(table.mview_enable_refresh) + table.mview_enable_refresh = True + self.assertTrue(table.mview_enable_refresh) + table.mview_enable_refresh = False + self.assertFalse(table.mview_enable_refresh) + table.mview_enable_refresh = None + self.assertIsNone(table.mview_enable_refresh) + + def test_mview_refresh_interval(self): + table = self._make_one() + self.assertIsNone(table.mview_refresh_interval) + table.mview_refresh_interval = datetime.timedelta(minutes=30) + self.assertEqual(table.mview_refresh_interval, datetime.timedelta(minutes=30)) + self.assertEqual( + table._properties["materializedView"]["refreshIntervalMs"], "1800000" + ) + table.mview_refresh_interval = None + self.assertIsNone(table.mview_refresh_interval) + def test_from_string(self): cls = self._get_target_class() got = cls.from_string("string-project.string_dataset.string_table") @@ -1286,7 +1336,6 @@ def _make_one(self, *args, **kw): return self._get_target_class()(*args, **kw) def _setUpConstants(self): - import datetime from google.cloud._helpers import UTC self.WHEN_TS = 1437767599.125 @@ -2413,7 +2462,7 @@ def test_to_dataframe_timestamp_out_of_pyarrow_bounds(self): tzinfo = None if PYARROW_VERSION >= PYARROW_TIMESTAMP_VERSION: - tzinfo = dt.timezone.utc + tzinfo = datetime.timezone.utc self.assertIsInstance(df, pandas.DataFrame) self.assertEqual(len(df), 2) # verify the number of rows @@ -2421,8 +2470,8 @@ def test_to_dataframe_timestamp_out_of_pyarrow_bounds(self): self.assertEqual( list(df["some_timestamp"]), [ - dt.datetime(4567, 1, 1, tzinfo=tzinfo), - dt.datetime(9999, 12, 31, tzinfo=tzinfo), + datetime.datetime(4567, 1, 1, tzinfo=tzinfo), + datetime.datetime(9999, 12, 31, tzinfo=tzinfo), ], ) @@ -2454,7 +2503,7 @@ def test_to_dataframe_datetime_out_of_pyarrow_bounds(self): self.assertEqual(list(df.columns), ["some_datetime"]) self.assertEqual( list(df["some_datetime"]), - [dt.datetime(4567, 1, 1), dt.datetime(9999, 12, 31)], + [datetime.datetime(4567, 1, 1), datetime.datetime(9999, 12, 31)], ) @unittest.skipIf(pandas is None, "Requires `pandas`")