diff --git a/docs/bigquery_storage_v1beta2/library.rst b/docs/bigquery_storage_v1beta2/library.rst index 4e25d9d8..fd7dcb36 100644 --- a/docs/bigquery_storage_v1beta2/library.rst +++ b/docs/bigquery_storage_v1beta2/library.rst @@ -4,3 +4,7 @@ Bigquery Storage v1beta2 API Library .. automodule:: google.cloud.bigquery_storage_v1beta2.client :members: :inherited-members: + +.. automodule:: google.cloud.bigquery_storage_v1beta2.writer + :members: + :inherited-members: diff --git a/google/cloud/bigquery_storage_v1beta2/__init__.py b/google/cloud/bigquery_storage_v1beta2/__init__.py index 6d0b34e1..8efde268 100644 --- a/google/cloud/bigquery_storage_v1beta2/__init__.py +++ b/google/cloud/bigquery_storage_v1beta2/__init__.py @@ -30,10 +30,15 @@ class BigQueryReadClient(client.BigQueryReadClient): __doc__ = client.BigQueryReadClient.__doc__ +class BigQueryWriteClient(client.BigQueryWriteClient): + __doc__ = client.BigQueryWriteClient.__doc__ + + __all__ = ( # google.cloud.bigquery_storage_v1beta2 "__version__", "types", # google.cloud.bigquery_storage_v1beta2.client "BigQueryReadClient", + "BigQueryWriteClient", ) diff --git a/google/cloud/bigquery_storage_v1beta2/client.py b/google/cloud/bigquery_storage_v1beta2/client.py index f2776a20..00bff3ff 100644 --- a/google/cloud/bigquery_storage_v1beta2/client.py +++ b/google/cloud/bigquery_storage_v1beta2/client.py @@ -19,12 +19,14 @@ This is the base from which all interactions with the API occur. """ -from __future__ import absolute_import - import google.api_core.gapic_v1.method +import google.api_core.retry from google.cloud.bigquery_storage_v1 import reader -from google.cloud.bigquery_storage_v1beta2.services import big_query_read +from google.cloud.bigquery_storage_v1beta2.services import ( + big_query_read, + big_query_write, +) _SCOPES = ( @@ -135,3 +137,7 @@ def read_rows( offset, {"retry": retry, "timeout": timeout, "metadata": metadata}, ) + + +class BigQueryWriteClient(big_query_write.BigQueryWriteClient): + __doc__ = big_query_write.BigQueryWriteClient.__doc__ diff --git a/google/cloud/bigquery_storage_v1beta2/exceptions.py b/google/cloud/bigquery_storage_v1beta2/exceptions.py new file mode 100644 index 00000000..0887071d --- /dev/null +++ b/google/cloud/bigquery_storage_v1beta2/exceptions.py @@ -0,0 +1,17 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class StreamClosedError(Exception): + """Operation not supported while stream is closed.""" diff --git a/google/cloud/bigquery_storage_v1beta2/writer.py b/google/cloud/bigquery_storage_v1beta2/writer.py new file mode 100644 index 00000000..3a77a07e --- /dev/null +++ b/google/cloud/bigquery_storage_v1beta2/writer.py @@ -0,0 +1,412 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from __future__ import division + +import itertools +import logging +import queue +import time +import threading +from typing import Callable, Optional, Sequence, Tuple + +from google.api_core import bidi +from google.api_core.future import polling as polling_future +from google.api_core import exceptions +import google.api_core.retry +import grpc + +from google.cloud.bigquery_storage_v1beta2 import exceptions as bqstorage_exceptions +from google.cloud.bigquery_storage_v1beta2 import types as gapic_types +from google.cloud.bigquery_storage_v1beta2.services import big_query_write + +_LOGGER = logging.getLogger(__name__) +_REGULAR_SHUTDOWN_THREAD_NAME = "Thread-RegularStreamShutdown" +_RPC_ERROR_THREAD_NAME = "Thread-OnRpcTerminated" + +# open() takes between 0.25 and 0.4 seconds to be ready. Wait each loop before +# checking again. This interval was chosen to result in about 3 loops. +_WRITE_OPEN_INTERVAL = 0.08 + +# Use a default timeout that is quite long to avoid potential infinite loops, +# but still work for all expected requests +_DEFAULT_TIMEOUT = 600 + + +def _wrap_as_exception(maybe_exception) -> Exception: + """Wrap an object as a Python exception, if needed. + Args: + maybe_exception (Any): The object to wrap, usually a gRPC exception class. + Returns: + The argument itself if an instance of ``BaseException``, otherwise + the argument represented as an instance of ``Exception`` (sub)class. + """ + if isinstance(maybe_exception, grpc.RpcError): + return exceptions.from_grpc_error(maybe_exception) + elif isinstance(maybe_exception, BaseException): + return maybe_exception + + return Exception(maybe_exception) + + +class AppendRowsStream(object): + """A manager object which can append rows to a stream.""" + + def __init__( + self, + client: big_query_write.BigQueryWriteClient, + initial_request_template: gapic_types.AppendRowsRequest, + metadata: Sequence[Tuple[str, str]] = (), + ): + """Construct a stream manager. + + Args: + client: + Client responsible for making requests. + initial_request_template: + Data to include in the first request sent to the stream. This + must contain + :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream` + and + :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.ProtoData.writer_schema`. + metadata: + Extra headers to include when sending the streaming request. + """ + self._client = client + self._closing = threading.Lock() + self._closed = False + self._close_callbacks = [] + self._futures_queue = queue.Queue() + self._inital_request_template = initial_request_template + self._metadata = metadata + self._rpc = None + self._stream_name = None + + # The threads created in ``.open()``. + self._consumer = None + + @property + def is_active(self) -> bool: + """bool: True if this manager is actively streaming. + + Note that ``False`` does not indicate this is complete shut down, + just that it stopped getting new messages. + """ + return self._consumer is not None and self._consumer.is_active + + def add_close_callback(self, callback: Callable): + """Schedules a callable when the manager closes. + Args: + callback (Callable): The method to call. + """ + self._close_callbacks.append(callback) + + def open( + self, + initial_request: gapic_types.AppendRowsRequest, + timeout: float = _DEFAULT_TIMEOUT, + ) -> "AppendRowsFuture": + """Open an append rows stream. + + This is automatically called by the first call to the + :attr:`google.cloud.bigquery_storage_v1beta2.writer.AppendRowsStream.send` + method. + + Args: + initial_request: + The initial request to start the stream. Must have + :attr:`google.cloud.bigquery_storage_v1beta2.types.AppendRowsRequest.write_stream` + and ``proto_rows.writer_schema.proto_descriptor`` and + properties populated. + timeout: + How long (in seconds) to wait for the stream to be ready. + + Returns: + A future, which can be used to process the response to the initial + request when it arrives. + """ + if self.is_active: + raise ValueError("This manager is already open.") + + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + "This manager has been closed and can not be re-used." + ) + + start_time = time.monotonic() + request = gapic_types.AppendRowsRequest() + gapic_types.AppendRowsRequest.copy_from(request, self._inital_request_template) + request._pb.MergeFrom(initial_request._pb) + self._stream_name = request.write_stream + + inital_response_future = AppendRowsFuture(self) + self._futures_queue.put(inital_response_future) + + self._rpc = bidi.BidiRpc( + self._client.append_rows, + initial_request=request, + # TODO: pass in retry and timeout. Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + metadata=tuple( + itertools.chain( + self._metadata, + # This header is required so that the BigQuery Storage API + # knows which region to route the request to. + (("x-goog-request-params", f"write_stream={self._stream_name}"),), + ) + ), + ) + self._rpc.add_done_callback(self._on_rpc_done) + + self._consumer = bidi.BackgroundConsumer(self._rpc, self._on_response) + self._consumer.start() + + # Make sure RPC has started before returning. + # Without this, consumers may get: + # + # ValueError: Can not send() on an RPC that has never been open()ed. + # + # when they try to send a request. + while not self._rpc.is_active and self._consumer.is_active: + # Avoid 100% CPU while waiting for RPC to be ready. + time.sleep(_WRITE_OPEN_INTERVAL) + + # TODO: Check retry.deadline instead of (per-request) timeout. + # Blocked by + # https://github.com/googleapis/python-api-core/issues/262 + if timeout is None: + continue + current_time = time.monotonic() + if current_time - start_time > timeout: + break + + # Something went wrong when opening the RPC. + if not self._consumer.is_active: + # TODO: Share the exception from _rpc.open(). Blocked by + # https://github.com/googleapis/python-api-core/issues/268 + request_exception = exceptions.Unknown( + "There was a problem opening the stream. " + "Try turning on DEBUG level logs to see the error." + ) + self.close(reason=request_exception) + raise request_exception + + return inital_response_future + + def send(self, request: gapic_types.AppendRowsRequest) -> "AppendRowsFuture": + """Send an append rows request to the open stream. + + Args: + request: + The request to add to the stream. + + Returns: + A future, which can be used to process the response when it + arrives. + """ + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + "This manager has been closed and can not be used." + ) + + # If the manager hasn't been openned yet, automatically open it. + if not self.is_active: + return self.open(request) + + # For each request, we expect exactly one response (in order). Add a + # future to the queue so that when the response comes, the callback can + # pull it off and notify completion. + future = AppendRowsFuture(self) + self._futures_queue.put(future) + self._rpc.send(request) + return future + + def _on_response(self, response: gapic_types.AppendRowsResponse): + """Process a response from a consumer callback.""" + # If the stream has closed, but somehow we still got a response message + # back, discard it. The response futures queue has been drained, with + # an exception reported. + if self._closed: + raise bqstorage_exceptions.StreamClosedError( + f"Stream closed before receiving response: {response}" + ) + + # Since we have 1 response per request, if we get here from a response + # callback, the queue should never be empty. + future: AppendRowsFuture = self._futures_queue.get_nowait() + if response.error.code: + exc = exceptions.from_grpc_status( + response.error.code, response.error.message + ) + future.set_exception(exc) + else: + future.set_result(response) + + def close(self, reason: Optional[Exception] = None): + """Stop consuming messages and shutdown all helper threads. + + This method is idempotent. Additional calls will have no effect. + + The method does not block, it delegates the shutdown operations to a background + thread. + + Args: + reason: The reason to close this. If ``None``, this is considered + an "intentional" shutdown. This is passed to the callbacks + specified via :meth:`add_close_callback`. + """ + self._regular_shutdown_thread = threading.Thread( + name=_REGULAR_SHUTDOWN_THREAD_NAME, + daemon=True, + target=self._shutdown, + kwargs={"reason": reason}, + ) + self._regular_shutdown_thread.start() + + def _shutdown(self, reason: Optional[Exception] = None): + """Run the actual shutdown sequence (stop the stream and all helper threads). + + Args: + reason: + The reason to close the stream. If ``None``, this is + considered an "intentional" shutdown. + """ + with self._closing: + if self._closed: + return + + # Stop consuming messages. + if self.is_active: + _LOGGER.debug("Stopping consumer.") + self._consumer.stop() + self._consumer = None + + self._rpc.close() + self._rpc = None + self._closed = True + _LOGGER.debug("Finished stopping manager.") + + # We know that no new items will be added to the queue because + # we've marked the stream as closed. + while not self._futures_queue.empty(): + # Mark each future as failed. Since the consumer thread has + # stopped (or at least is attempting to stop), we won't get + # response callbacks to populate the remaining futures. + future = self._futures_queue.get_nowait() + if reason is None: + exc = bqstorage_exceptions.StreamClosedError( + "Stream closed before receiving a response." + ) + else: + exc = reason + future.set_exception(exc) + + for callback in self._close_callbacks: + callback(self, reason) + + def _on_rpc_done(self, future): + """Triggered whenever the underlying RPC terminates without recovery. + + This is typically triggered from one of two threads: the background + consumer thread (when calling ``recv()`` produces a non-recoverable + error) or the grpc management thread (when cancelling the RPC). + + This method is *non-blocking*. It will start another thread to deal + with shutting everything down. This is to prevent blocking in the + background consumer and preventing it from being ``joined()``. + """ + _LOGGER.info("RPC termination has signaled streaming pull manager shutdown.") + error = _wrap_as_exception(future) + thread = threading.Thread( + name=_RPC_ERROR_THREAD_NAME, target=self._shutdown, kwargs={"reason": error} + ) + thread.daemon = True + thread.start() + + +class AppendRowsFuture(polling_future.PollingFuture): + """Encapsulation of the asynchronous execution of an action. + + This object is returned from long-running BigQuery Storage API calls, and + is the interface to determine the status of those calls. + + This object should not be created directly, but is returned by other + methods in this library. + """ + + def __init__(self, manager: AppendRowsStream): + super().__init__() + self.__manager = manager + self.__cancelled = False + self._is_done = False + + def cancel(self): + """Stops pulling messages and shutdowns the background thread consuming + messages. + + The method does not block, it just triggers the shutdown and returns + immediately. To block until the background stream is terminated, call + :meth:`result()` after cancelling the future. + """ + # NOTE: We circumvent the base future's self._state to track the cancellation + # state, as this state has different meaning with streaming pull futures. + # See: https://github.com/googleapis/python-pubsub/pull/397 + self.__cancelled = True + return self.__manager.close() + + def cancelled(self): + """ + returns: + bool: ``True`` if the write stream has been cancelled. + """ + return self.__cancelled + + def done(self, retry: Optional[google.api_core.retry.Retry] = None) -> bool: + """Check the status of the future. + + Args: + retry: + Not used. Included for compatibility with base clase. Future + status is updated by a background thread. + + Returns: + ``True`` if the request has finished, otherwise ``False``. + """ + # Consumer should call set_result or set_exception method, where this + # gets set to True *after* first setting _result. + # + # Consumer runs in a background thread, but this access is thread-safe: + # https://docs.python.org/3/faq/library.html#what-kinds-of-global-value-mutation-are-thread-safe + return self._is_done + + def set_exception(self, exception): + """Set the result of the future as being the given exception. + + Do not use this method, it should only be used internally by the library and its + unit tests. + """ + return_value = super().set_exception(exception=exception) + self._is_done = True + return return_value + + def set_result(self, result): + """Set the return value of work associated with the future. + + Do not use this method, it should only be used internally by the library and its + unit tests. + """ + return_value = super().set_result(result=result) + self._is_done = True + return return_value diff --git a/noxfile.py b/noxfile.py index 3493eb27..98cd608b 100644 --- a/noxfile.py +++ b/noxfile.py @@ -94,7 +94,7 @@ def default(session): constraints_path, ) - session.install("-e", ".[fastavro,pandas,pyarrow]", "-c", constraints_path) + session.install("-e", ".[tests,fastavro,pandas,pyarrow]", "-c", constraints_path) # Run py.test against the unit tests. session.run( diff --git a/owlbot.py b/owlbot.py index b96b16a4..3d6771db 100644 --- a/owlbot.py +++ b/owlbot.py @@ -14,8 +14,6 @@ """This script is used to synthesize generated parts of this library.""" -import re - import synthtool as s from synthtool import gcp from synthtool.languages import python @@ -39,7 +37,7 @@ # wraps it. s.replace( library / "google/cloud/bigquery_storage/__init__.py", - f"from google\.cloud\.bigquery_storage_{library.name}\.services.big_query_read.client import", + f"from google\\.cloud\\.bigquery_storage_{library.name}\\.services.big_query_read.client import", f"from google.cloud.bigquery_storage_{library.name} import", ) @@ -48,7 +46,7 @@ s.replace( library / "google/cloud/bigquery_storage/__init__.py", ( - f"from google\.cloud\.bigquery_storage_{library.name}\.services.big_query_read.async_client " + f"from google\\.cloud\\.bigquery_storage_{library.name}\\.services.big_query_read.async_client " r"import BigQueryReadAsyncClient\n" ), "", @@ -63,17 +61,17 @@ # entry point. s.replace( library / "google/cloud/bigquery_storage/__init__.py", - f"from google\.cloud\.bigquery_storage_{library.name}\.types\.arrow import ArrowRecordBatch", + f"from google\\.cloud\\.bigquery_storage_{library.name}\\.types\\.arrow import ArrowRecordBatch", ( f"from google.cloud.bigquery_storage_{library.name} import types\n" f"from google.cloud.bigquery_storage_{library.name} import __version__\n" - "\g<0>" + "\\g<0>" ), ) s.replace( library / "google/cloud/bigquery_storage/__init__.py", r"""["']ArrowRecordBatch["']""", - ('"__version__",\n' ' "types",\n' " \g<0>"), + ('"__version__",\n' ' "types",\n' " \\g<0>"), ) # We want to expose all types through "google.cloud.bigquery_storage.types", @@ -89,12 +87,12 @@ s.replace( library / f"google/cloud/bigquery_storage_{library.name}*/types/__init__.py", r"from \.stream import \(", - "\g<0>\n DataFormat,", + "\\g<0>\n DataFormat,", ) s.replace( library / f"google/cloud/bigquery_storage_{library.name}*/types/__init__.py", r"""["']ReadSession["']""", - '"DataFormat",\n \g<0>', + '"DataFormat",\n \\g<0>', ) # The append_rows method doesn't contain keyword arguments that build request @@ -130,11 +128,12 @@ # Add templated files # ---------------------------------------------------------------------------- extras = ["fastavro", "pandas", "pyarrow"] +unit_test_extras = ["tests"] + extras templated_files = common.py_library( microgenerator=True, samples=True, - unit_test_extras=extras, + unit_test_extras=unit_test_extras, system_test_extras=extras, system_test_external_dependencies=["google-cloud-bigquery"], cov_level=95, @@ -178,7 +177,7 @@ ) s.replace( - "CONTRIBUTING.rst", "remote \(``master``\)", "remote (``main``)", + "CONTRIBUTING.rst", "remote \\(``master``\\)", "remote (``main``)", ) s.replace( diff --git a/samples/snippets/__init__.py b/samples/snippets/__init__.py new file mode 100644 index 00000000..0098709d --- /dev/null +++ b/samples/snippets/__init__.py @@ -0,0 +1,15 @@ +# -*- coding: utf-8 -*- +# +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. diff --git a/samples/snippets/append_rows_proto2.py b/samples/snippets/append_rows_proto2.py new file mode 100644 index 00000000..d0390993 --- /dev/null +++ b/samples/snippets/append_rows_proto2.py @@ -0,0 +1,249 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# [START bigquerystorage_append_rows_raw_proto2] +""" +This code sample demonstrates using the low-level generated client for Python. +""" + +import datetime +import decimal + +from google.cloud import bigquery_storage_v1beta2 +from google.cloud.bigquery_storage_v1beta2 import types +from google.cloud.bigquery_storage_v1beta2 import writer +from google.protobuf import descriptor_pb2 + +# If you make updates to the sample_data.proto protocol buffers definition, +# run: +# +# protoc --python_out=. sample_data.proto +# +# from the samples/snippets directory to generate the sample_data_pb2 module. +from . import sample_data_pb2 + + +def append_rows_proto2(project_id: str, dataset_id: str, table_id: str): + """Create a write stream, write some sample data, and commit the stream.""" + write_client = bigquery_storage_v1beta2.BigQueryWriteClient() + parent = write_client.table_path(project_id, dataset_id, table_id) + write_stream = types.WriteStream() + + # When creating the stream, choose the type. Use the PENDING type to wait + # until the stream is committed before it is visible. See: + # https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1beta2#google.cloud.bigquery.storage.v1beta2.WriteStream.Type + write_stream.type_ = types.WriteStream.Type.PENDING + write_stream = write_client.create_write_stream( + parent=parent, write_stream=write_stream + ) + stream_name = write_stream.name + + # Create a template with fields needed for the first request. + request_template = types.AppendRowsRequest() + + # The initial request must contain the stream name. + request_template.write_stream = stream_name + + # So that BigQuery knows how to parse the serialized_rows, generate a + # protocol buffer representation of your message descriptor. + proto_schema = types.ProtoSchema() + proto_descriptor = descriptor_pb2.DescriptorProto() + sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor) + proto_schema.proto_descriptor = proto_descriptor + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.writer_schema = proto_schema + request_template.proto_rows = proto_data + + # Some stream types support an unbounded number of requests. Construct an + # AppendRowsStream to send an arbitrary number of requests to a stream. + append_rows_stream = writer.AppendRowsStream(write_client, request_template) + + # Create a batch of row data by appending proto2 serialized bytes to the + # serialized_rows repeated field. + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 1 + row.bool_col = True + row.bytes_col = b"Hello, World!" + row.float64_col = float("+inf") + row.int64_col = 123 + row.string_col = "Howdy!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 2 + row.bool_col = False + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 3 + row.bytes_col = b"See you later!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 4 + row.float64_col = 1000000.125 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 5 + row.int64_col = 67000 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 6 + row.string_col = "Auf Wiedersehen!" + proto_rows.serialized_rows.append(row.SerializeToString()) + + # Set an offset to allow resuming this stream if the connection breaks. + # Keep track of which requests the server has acknowledged and resume the + # stream at the first non-acknowledged message. If the server has already + # processed a message with that offset, it will return an ALREADY_EXISTS + # error, which can be safely ignored. + # + # The first request must always have an offset of 0. + request = types.AppendRowsRequest() + request.offset = 0 + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + response_future_1 = append_rows_stream.send(request) + + # Create a batch of rows containing scalar values that don't directly + # correspond to a protocol buffers scalar type. See the documentation for + # the expected data formats: + # https://cloud.google.com/bigquery/docs/write-api#data_type_conversions + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 7 + date_value = datetime.date(2021, 8, 12) + epoch_value = datetime.date(1970, 1, 1) + delta = date_value - epoch_value + row.date_col = delta.days + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 8 + datetime_value = datetime.datetime(2021, 8, 12, 9, 46, 23, 987456) + row.datetime_col = datetime_value.strftime("%Y-%m-%d %H:%M:%S.%f") + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 9 + row.geography_col = "POINT(-122.347222 47.651111)" + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 10 + numeric_value = decimal.Decimal("1.23456789101112e+6") + row.numeric_col = str(numeric_value) + bignumeric_value = decimal.Decimal("-1.234567891011121314151617181920e+16") + row.bignumeric_col = str(bignumeric_value) + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 11 + time_value = datetime.time(11, 7, 48, 123456) + row.time_col = time_value.strftime("%H:%M:%S.%f") + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 12 + timestamp_value = datetime.datetime( + 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc + ) + epoch_value = datetime.datetime(1970, 1, 1, tzinfo=datetime.timezone.utc) + delta = timestamp_value - epoch_value + row.timestamp_col = int(delta.total_seconds()) * 1000000 + int(delta.microseconds) + proto_rows.serialized_rows.append(row.SerializeToString()) + + # Since this is the second request, you only need to include the row data. + # The name of the stream and protocol buffers DESCRIPTOR is only needed in + # the first request. + request = types.AppendRowsRequest() + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + # Offset must equal the number of rows that were previously sent. + request.offset = 6 + + response_future_2 = append_rows_stream.send(request) + + # Create a batch of rows with STRUCT and ARRAY BigQuery data types. In + # protocol buffers, these correspond to nested messages and repeated + # fields, respectively. + proto_rows = types.ProtoRows() + + row = sample_data_pb2.SampleData() + row.row_num = 13 + row.int64_list.append(1) + row.int64_list.append(2) + row.int64_list.append(3) + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 14 + row.struct_col.sub_int_col = 7 + proto_rows.serialized_rows.append(row.SerializeToString()) + + row = sample_data_pb2.SampleData() + row.row_num = 15 + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -1 + row.struct_list.append(sub_message) + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -2 + row.struct_list.append(sub_message) + sub_message = sample_data_pb2.SampleData.SampleStruct() + sub_message.sub_int_col = -3 + row.struct_list.append(sub_message) + proto_rows.serialized_rows.append(row.SerializeToString()) + + request = types.AppendRowsRequest() + request.offset = 12 + proto_data = types.AppendRowsRequest.ProtoData() + proto_data.rows = proto_rows + request.proto_rows = proto_data + + # For each request sent, a message is expected in the responses iterable. + # This sample sends 3 requests, therefore expect exactly 3 responses. + response_future_3 = append_rows_stream.send(request) + + # All three requests are in-flight, wait for them to finish being processed + # before finalizing the stream. + print(response_future_1.result()) + print(response_future_2.result()) + print(response_future_3.result()) + + # Shutdown background threads and close the streaming connection. + append_rows_stream.close() + + # A PENDING type stream must be "finalized" before being committed. No new + # records can be written to the stream after this method has been called. + write_client.finalize_write_stream(name=write_stream.name) + + # Commit the stream you created earlier. + batch_commit_write_streams_request = types.BatchCommitWriteStreamsRequest() + batch_commit_write_streams_request.parent = parent + batch_commit_write_streams_request.write_streams = [write_stream.name] + write_client.batch_commit_write_streams(batch_commit_write_streams_request) + + print(f"Writes to stream: '{write_stream.name}' have been committed.") + + +# [END bigquerystorage_append_rows_raw_proto2] diff --git a/samples/snippets/append_rows_proto2_test.py b/samples/snippets/append_rows_proto2_test.py new file mode 100644 index 00000000..dddda301 --- /dev/null +++ b/samples/snippets/append_rows_proto2_test.py @@ -0,0 +1,126 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import decimal +import pathlib +import random + +from google.cloud import bigquery +import pytest + +from . import append_rows_proto2 + + +DIR = pathlib.Path(__file__).parent + + +regions = ["US", "non-US"] + + +@pytest.fixture(params=regions) +def sample_data_table( + request: pytest.FixtureRequest, + bigquery_client: bigquery.Client, + project_id: str, + dataset_id: str, + dataset_id_non_us: str, +) -> str: + dataset = dataset_id + if request.param != "US": + dataset = dataset_id_non_us + schema = bigquery_client.schema_from_json(str(DIR / "sample_data_schema.json")) + table_id = f"append_rows_proto2_{random.randrange(10000)}" + full_table_id = f"{project_id}.{dataset}.{table_id}" + table = bigquery.Table(full_table_id, schema=schema) + table = bigquery_client.create_table(table, exists_ok=True) + yield full_table_id + bigquery_client.delete_table(table, not_found_ok=True) + + +def test_append_rows_proto2( + capsys: pytest.CaptureFixture, + bigquery_client: bigquery.Client, + sample_data_table: str, +): + project_id, dataset_id, table_id = sample_data_table.split(".") + append_rows_proto2.append_rows_proto2( + project_id=project_id, dataset_id=dataset_id, table_id=table_id + ) + out, _ = capsys.readouterr() + assert "have been committed" in out + + rows = bigquery_client.query( + f"SELECT * FROM `{project_id}.{dataset_id}.{table_id}`" + ).result() + row_items = [ + # Convert to sorted tuple of items, omitting NULL values, to make + # searching for expected rows easier. + tuple( + sorted( + item for item in row.items() if item[1] is not None and item[1] != [] + ) + ) + for row in rows + ] + + assert ( + ("bool_col", True), + ("bytes_col", b"Hello, World!"), + ("float64_col", float("+inf")), + ("int64_col", 123), + ("row_num", 1), + ("string_col", "Howdy!"), + ) in row_items + assert (("bool_col", False), ("row_num", 2)) in row_items + assert (("bytes_col", b"See you later!"), ("row_num", 3)) in row_items + assert (("float64_col", 1000000.125), ("row_num", 4)) in row_items + assert (("int64_col", 67000), ("row_num", 5)) in row_items + assert (("row_num", 6), ("string_col", "Auf Wiedersehen!")) in row_items + assert (("date_col", datetime.date(2021, 8, 12)), ("row_num", 7)) in row_items + assert ( + ("datetime_col", datetime.datetime(2021, 8, 12, 9, 46, 23, 987456)), + ("row_num", 8), + ) in row_items + assert ( + ("geography_col", "POINT(-122.347222 47.651111)"), + ("row_num", 9), + ) in row_items + assert ( + ("bignumeric_col", decimal.Decimal("-1.234567891011121314151617181920e+16")), + ("numeric_col", decimal.Decimal("1.23456789101112e+6")), + ("row_num", 10), + ) in row_items + assert ( + ("row_num", 11), + ("time_col", datetime.time(11, 7, 48, 123456)), + ) in row_items + assert ( + ("row_num", 12), + ( + "timestamp_col", + datetime.datetime( + 2021, 8, 12, 16, 11, 22, 987654, tzinfo=datetime.timezone.utc + ), + ), + ) in row_items + assert (("int64_list", [1, 2, 3]), ("row_num", 13)) in row_items + assert (("row_num", 14), ("struct_col", {"sub_int_col": 7}),) in row_items + assert ( + ("row_num", 15), + ( + "struct_list", + [{"sub_int_col": -1}, {"sub_int_col": -2}, {"sub_int_col": -3}], + ), + ) in row_items diff --git a/samples/snippets/conftest.py b/samples/snippets/conftest.py new file mode 100644 index 00000000..531f0b9d --- /dev/null +++ b/samples/snippets/conftest.py @@ -0,0 +1,60 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.cloud import bigquery +import pytest +import test_utils.prefixer + + +prefixer = test_utils.prefixer.Prefixer("python-bigquery-storage", "samples/snippets") + + +@pytest.fixture(scope="session", autouse=True) +def cleanup_datasets(bigquery_client: bigquery.Client): + for dataset in bigquery_client.list_datasets(): + if prefixer.should_cleanup(dataset.dataset_id): + bigquery_client.delete_dataset( + dataset, delete_contents=True, not_found_ok=True + ) + + +@pytest.fixture(scope="session") +def bigquery_client(): + return bigquery.Client() + + +@pytest.fixture(scope="session") +def project_id(bigquery_client): + return bigquery_client.project + + +@pytest.fixture(scope="session") +def dataset_id(bigquery_client: bigquery.Client, project_id: str): + dataset_id = prefixer.create_prefix() + full_dataset_id = f"{project_id}.{dataset_id}" + dataset = bigquery.Dataset(full_dataset_id) + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True) + + +@pytest.fixture(scope="session") +def dataset_id_non_us(bigquery_client: bigquery.Client, project_id: str): + dataset_id = prefixer.create_prefix() + full_dataset_id = f"{project_id}.{dataset_id}" + dataset = bigquery.Dataset(full_dataset_id) + dataset.location = "asia-northeast1" + bigquery_client.create_dataset(dataset) + yield dataset_id + bigquery_client.delete_dataset(dataset, delete_contents=True, not_found_ok=True) diff --git a/samples/snippets/noxfile.py b/samples/snippets/noxfile.py new file mode 100644 index 00000000..b008613f --- /dev/null +++ b/samples/snippets/noxfile.py @@ -0,0 +1,266 @@ +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import os +from pathlib import Path +import sys +from typing import Callable, Dict, List, Optional + +import nox + + +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING +# DO NOT EDIT THIS FILE EVER! +# WARNING - WARNING - WARNING - WARNING - WARNING +# WARNING - WARNING - WARNING - WARNING - WARNING + +BLACK_VERSION = "black==19.10b0" + +# Copy `noxfile_config.py` to your directory and modify it instead. + +# `TEST_CONFIG` dict is a configuration hook that allows users to +# modify the test configurations. The values here should be in sync +# with `noxfile_config.py`. Users will copy `noxfile_config.py` into +# their directory and modify it. + +TEST_CONFIG = { + # You can opt out from the test for specific Python versions. + "ignored_versions": [], + # Old samples are opted out of enforcing Python type hints + # All new samples should feature them + "enforce_type_hints": False, + # An envvar key for determining the project id to use. Change it + # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a + # build specific Cloud project. You can also use your own string + # to use your own Cloud project. + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", + # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', + # If you need to use a specific version of pip, + # change pip_version_override to the string representation + # of the version number, for example, "20.2.4" + "pip_version_override": None, + # A dictionary you want to inject into your test. Don't put any + # secrets here. These values will override predefined values. + "envs": {}, +} + + +try: + # Ensure we can import noxfile_config in the project's directory. + sys.path.append(".") + from noxfile_config import TEST_CONFIG_OVERRIDE +except ImportError as e: + print("No user noxfile_config found: detail: {}".format(e)) + TEST_CONFIG_OVERRIDE = {} + +# Update the TEST_CONFIG with the user supplied values. +TEST_CONFIG.update(TEST_CONFIG_OVERRIDE) + + +def get_pytest_env_vars() -> Dict[str, str]: + """Returns a dict for pytest invocation.""" + ret = {} + + # Override the GCLOUD_PROJECT and the alias. + env_key = TEST_CONFIG["gcloud_project_env"] + # This should error out if not set. + ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] + + # Apply user supplied envs. + ret.update(TEST_CONFIG["envs"]) + return ret + + +# DO NOT EDIT - automatically generated. +# All versions used to test samples. +ALL_VERSIONS = ["3.6", "3.7", "3.8", "3.9"] + +# Any default versions that should be ignored. +IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] + +TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) + +INSTALL_LIBRARY_FROM_SOURCE = os.environ.get("INSTALL_LIBRARY_FROM_SOURCE", False) in ( + "True", + "true", +) +# +# Style Checks +# + + +def _determine_local_import_names(start_dir: str) -> List[str]: + """Determines all import names that should be considered "local". + + This is used when running the linter to insure that import order is + properly checked. + """ + file_ext_pairs = [os.path.splitext(path) for path in os.listdir(start_dir)] + return [ + basename + for basename, extension in file_ext_pairs + if extension == ".py" + or os.path.isdir(os.path.join(start_dir, basename)) + and basename not in ("__pycache__") + ] + + +# Linting with flake8. +# +# We ignore the following rules: +# E203: whitespace before ‘:’ +# E266: too many leading ‘#’ for block comment +# E501: line too long +# I202: Additional newline in a section of imports +# +# We also need to specify the rules which are ignored by default: +# ['E226', 'W504', 'E126', 'E123', 'W503', 'E24', 'E704', 'E121'] +FLAKE8_COMMON_ARGS = [ + "--show-source", + "--builtin=gettext", + "--max-complexity=20", + "--import-order-style=google", + "--exclude=.nox,.cache,env,lib,generated_pb2,*_pb2.py,*_pb2_grpc.py", + "--ignore=E121,E123,E126,E203,E226,E24,E266,E501,E704,W503,W504,I202", + "--max-line-length=88", +] + + +@nox.session +def lint(session: nox.sessions.Session) -> None: + if not TEST_CONFIG["enforce_type_hints"]: + session.install("flake8", "flake8-import-order") + else: + session.install("flake8", "flake8-import-order", "flake8-annotations") + + local_names = _determine_local_import_names(".") + args = FLAKE8_COMMON_ARGS + [ + "--application-import-names", + ",".join(local_names), + ".", + ] + session.run("flake8", *args) + + +# +# Black +# + + +@nox.session +def blacken(session: nox.sessions.Session) -> None: + session.install(BLACK_VERSION) + python_files = [path for path in os.listdir(".") if path.endswith(".py")] + + session.run("black", *python_files) + + +# +# Sample Tests +# + + +PYTEST_COMMON_ARGS = ["--junitxml=sponge_log.xml"] + + +def _session_tests( + session: nox.sessions.Session, post_install: Callable = None +) -> None: + if TEST_CONFIG["pip_version_override"]: + pip_version = TEST_CONFIG["pip_version_override"] + session.install(f"pip=={pip_version}") + """Runs py.test for a particular project.""" + if os.path.exists("requirements.txt"): + if os.path.exists("constraints.txt"): + session.install("-r", "requirements.txt", "-c", "constraints.txt") + else: + session.install("-r", "requirements.txt") + + if os.path.exists("requirements-test.txt"): + if os.path.exists("constraints-test.txt"): + session.install("-r", "requirements-test.txt", "-c", "constraints-test.txt") + else: + session.install("-r", "requirements-test.txt") + + if INSTALL_LIBRARY_FROM_SOURCE: + session.install("-e", _get_repo_root()) + + if post_install: + post_install(session) + + session.run( + "pytest", + *(PYTEST_COMMON_ARGS + session.posargs), + # Pytest will return 5 when no tests are collected. This can happen + # on travis where slow and flaky tests are excluded. + # See http://doc.pytest.org/en/latest/_modules/_pytest/main.html + success_codes=[0, 5], + env=get_pytest_env_vars(), + ) + + +@nox.session(python=ALL_VERSIONS) +def py(session: nox.sessions.Session) -> None: + """Runs py.test for a sample using the specified version of Python.""" + if session.python in TESTED_VERSIONS: + _session_tests(session) + else: + session.skip( + "SKIPPED: {} tests are disabled for this sample.".format(session.python) + ) + + +# +# Readmegen +# + + +def _get_repo_root() -> Optional[str]: + """ Returns the root folder of the project. """ + # Get root of this repository. Assume we don't have directories nested deeper than 10 items. + p = Path(os.getcwd()) + for i in range(10): + if p is None: + break + if Path(p / ".git").exists(): + return str(p) + # .git is not available in repos cloned via Cloud Build + # setup.py is always in the library's root, so use that instead + # https://github.com/googleapis/synthtool/issues/792 + if Path(p / "setup.py").exists(): + return str(p) + p = p.parent + raise Exception("Unable to detect repository root.") + + +GENERATED_READMES = sorted([x for x in Path(".").rglob("*.rst.in")]) + + +@nox.session +@nox.parametrize("path", GENERATED_READMES) +def readmegen(session: nox.sessions.Session, path: str) -> None: + """(Re-)generates the readme for a sample.""" + session.install("jinja2", "pyyaml") + dir_ = os.path.dirname(path) + + if os.path.exists(os.path.join(dir_, "requirements.txt")): + session.install("-r", os.path.join(dir_, "requirements.txt")) + + in_file = os.path.join(dir_, "README.rst.in") + session.run( + "python", _get_repo_root() + "/scripts/readme-gen/readme_gen.py", in_file + ) diff --git a/samples/snippets/requirements-test.txt b/samples/snippets/requirements-test.txt new file mode 100644 index 00000000..85597665 --- /dev/null +++ b/samples/snippets/requirements-test.txt @@ -0,0 +1,2 @@ +google-cloud-testutils==1.0.0 +pytest==6.2.4 diff --git a/samples/snippets/requirements.txt b/samples/snippets/requirements.txt new file mode 100644 index 00000000..8df538bb --- /dev/null +++ b/samples/snippets/requirements.txt @@ -0,0 +1,3 @@ +google-cloud-bigquery-storage==2.6.2 +google-cloud-bigquery==2.24.1 +protobuf==3.17.3 diff --git a/samples/snippets/sample_data.proto b/samples/snippets/sample_data.proto new file mode 100644 index 00000000..3e9f19ce --- /dev/null +++ b/samples/snippets/sample_data.proto @@ -0,0 +1,61 @@ +// Copyright 2021 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// [START bigquerystorage_append_rows_raw_proto2_definition] +// The BigQuery Storage API expects protocol buffer data to be encoded in the +// proto2 wire format. This allows it to disambiguate missing optional fields +// from default values without the need for wrapper types. +syntax = "proto2"; + +// Define a message type representing the rows in your table. The message +// cannot contain fields which are not present in the table. +message SampleData { + // Use a nested message to encode STRUCT column values. + // + // References to external messages are not allowed. Any message definitions + // must be nested within the root message representing row data. + message SampleStruct { + optional int64 sub_int_col = 1; + } + + // The following types map directly between protocol buffers and their + // corresponding BigQuery data types. + optional bool bool_col = 1; + optional bytes bytes_col = 2; + optional double float64_col = 3; + optional int64 int64_col = 4; + optional string string_col = 5; + + // The following data types require some encoding to use. See the + // documentation for the expected data formats: + // https://cloud.google.com/bigquery/docs/write-api#data_type_conversion + optional int32 date_col = 6; + optional string datetime_col = 7; + optional string geography_col = 8; + optional string numeric_col = 9; + optional string bignumeric_col = 10; + optional string time_col = 11; + optional int64 timestamp_col = 12; + + // Use a repeated field to represent a BigQuery ARRAY value. + repeated int64 int64_list = 13; + + // Use a nested message to encode STRUCT and ARRAY values. + optional SampleStruct struct_col = 14; + repeated SampleStruct struct_list = 15; + + // Use the required keyword for client-side validation of required fields. + required int64 row_num = 16; +} +// [END bigquerystorage_append_rows_raw_proto2_definition] diff --git a/samples/snippets/sample_data_pb2.py b/samples/snippets/sample_data_pb2.py new file mode 100644 index 00000000..ba524988 --- /dev/null +++ b/samples/snippets/sample_data_pb2.py @@ -0,0 +1,418 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# source: sample_data.proto +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import message as _message +from google.protobuf import reflection as _reflection +from google.protobuf import symbol_database as _symbol_database + +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor.FileDescriptor( + name="sample_data.proto", + package="", + syntax="proto2", + serialized_options=None, + create_key=_descriptor._internal_create_key, + serialized_pb=b'\n\x11sample_data.proto"\xa9\x03\n\nSampleData\x12\x10\n\x08\x62ool_col\x18\x01 \x01(\x08\x12\x11\n\tbytes_col\x18\x02 \x01(\x0c\x12\x13\n\x0b\x66loat64_col\x18\x03 \x01(\x01\x12\x11\n\tint64_col\x18\x04 \x01(\x03\x12\x12\n\nstring_col\x18\x05 \x01(\t\x12\x10\n\x08\x64\x61te_col\x18\x06 \x01(\x05\x12\x14\n\x0c\x64\x61tetime_col\x18\x07 \x01(\t\x12\x15\n\rgeography_col\x18\x08 \x01(\t\x12\x13\n\x0bnumeric_col\x18\t \x01(\t\x12\x16\n\x0e\x62ignumeric_col\x18\n \x01(\t\x12\x10\n\x08time_col\x18\x0b \x01(\t\x12\x15\n\rtimestamp_col\x18\x0c \x01(\x03\x12\x12\n\nint64_list\x18\r \x03(\x03\x12,\n\nstruct_col\x18\x0e \x01(\x0b\x32\x18.SampleData.SampleStruct\x12-\n\x0bstruct_list\x18\x0f \x03(\x0b\x32\x18.SampleData.SampleStruct\x12\x0f\n\x07row_num\x18\x10 \x02(\x03\x1a#\n\x0cSampleStruct\x12\x13\n\x0bsub_int_col\x18\x01 \x01(\x03', +) + + +_SAMPLEDATA_SAMPLESTRUCT = _descriptor.Descriptor( + name="SampleStruct", + full_name="SampleData.SampleStruct", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="sub_int_col", + full_name="SampleData.SampleStruct.sub_int_col", + index=0, + number=1, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto2", + extension_ranges=[], + oneofs=[], + serialized_start=412, + serialized_end=447, +) + +_SAMPLEDATA = _descriptor.Descriptor( + name="SampleData", + full_name="SampleData", + filename=None, + file=DESCRIPTOR, + containing_type=None, + create_key=_descriptor._internal_create_key, + fields=[ + _descriptor.FieldDescriptor( + name="bool_col", + full_name="SampleData.bool_col", + index=0, + number=1, + type=8, + cpp_type=7, + label=1, + has_default_value=False, + default_value=False, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="bytes_col", + full_name="SampleData.bytes_col", + index=1, + number=2, + type=12, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"", + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="float64_col", + full_name="SampleData.float64_col", + index=2, + number=3, + type=1, + cpp_type=5, + label=1, + has_default_value=False, + default_value=float(0), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="int64_col", + full_name="SampleData.int64_col", + index=3, + number=4, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="string_col", + full_name="SampleData.string_col", + index=4, + number=5, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="date_col", + full_name="SampleData.date_col", + index=5, + number=6, + type=5, + cpp_type=1, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="datetime_col", + full_name="SampleData.datetime_col", + index=6, + number=7, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="geography_col", + full_name="SampleData.geography_col", + index=7, + number=8, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="numeric_col", + full_name="SampleData.numeric_col", + index=8, + number=9, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="bignumeric_col", + full_name="SampleData.bignumeric_col", + index=9, + number=10, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="time_col", + full_name="SampleData.time_col", + index=10, + number=11, + type=9, + cpp_type=9, + label=1, + has_default_value=False, + default_value=b"".decode("utf-8"), + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="timestamp_col", + full_name="SampleData.timestamp_col", + index=11, + number=12, + type=3, + cpp_type=2, + label=1, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="int64_list", + full_name="SampleData.int64_list", + index=12, + number=13, + type=3, + cpp_type=2, + label=3, + has_default_value=False, + default_value=[], + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="struct_col", + full_name="SampleData.struct_col", + index=13, + number=14, + type=11, + cpp_type=10, + label=1, + has_default_value=False, + default_value=None, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="struct_list", + full_name="SampleData.struct_list", + index=14, + number=15, + type=11, + cpp_type=10, + label=3, + has_default_value=False, + default_value=[], + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + _descriptor.FieldDescriptor( + name="row_num", + full_name="SampleData.row_num", + index=15, + number=16, + type=3, + cpp_type=2, + label=2, + has_default_value=False, + default_value=0, + message_type=None, + enum_type=None, + containing_type=None, + is_extension=False, + extension_scope=None, + serialized_options=None, + file=DESCRIPTOR, + create_key=_descriptor._internal_create_key, + ), + ], + extensions=[], + nested_types=[_SAMPLEDATA_SAMPLESTRUCT,], + enum_types=[], + serialized_options=None, + is_extendable=False, + syntax="proto2", + extension_ranges=[], + oneofs=[], + serialized_start=22, + serialized_end=447, +) + +_SAMPLEDATA_SAMPLESTRUCT.containing_type = _SAMPLEDATA +_SAMPLEDATA.fields_by_name["struct_col"].message_type = _SAMPLEDATA_SAMPLESTRUCT +_SAMPLEDATA.fields_by_name["struct_list"].message_type = _SAMPLEDATA_SAMPLESTRUCT +DESCRIPTOR.message_types_by_name["SampleData"] = _SAMPLEDATA +_sym_db.RegisterFileDescriptor(DESCRIPTOR) + +SampleData = _reflection.GeneratedProtocolMessageType( + "SampleData", + (_message.Message,), + { + "SampleStruct": _reflection.GeneratedProtocolMessageType( + "SampleStruct", + (_message.Message,), + { + "DESCRIPTOR": _SAMPLEDATA_SAMPLESTRUCT, + "__module__": "sample_data_pb2" + # @@protoc_insertion_point(class_scope:SampleData.SampleStruct) + }, + ), + "DESCRIPTOR": _SAMPLEDATA, + "__module__": "sample_data_pb2" + # @@protoc_insertion_point(class_scope:SampleData) + }, +) +_sym_db.RegisterMessage(SampleData) +_sym_db.RegisterMessage(SampleData.SampleStruct) + + +# @@protoc_insertion_point(module_scope) diff --git a/samples/snippets/sample_data_schema.json b/samples/snippets/sample_data_schema.json new file mode 100644 index 00000000..ba6ba102 --- /dev/null +++ b/samples/snippets/sample_data_schema.json @@ -0,0 +1,76 @@ + +[ + { + "name": "bool_col", + "type": "BOOLEAN" + }, + { + "name": "bytes_col", + "type": "BYTES" + }, + { + "name": "date_col", + "type": "DATE" + }, + { + "name": "datetime_col", + "type": "DATETIME" + }, + { + "name": "float64_col", + "type": "FLOAT" + }, + { + "name": "geography_col", + "type": "GEOGRAPHY" + }, + { + "name": "int64_col", + "type": "INTEGER" + }, + { + "name": "numeric_col", + "type": "NUMERIC" + }, + { + "name": "bignumeric_col", + "type": "BIGNUMERIC" + }, + { + "name": "row_num", + "type": "INTEGER", + "mode": "REQUIRED" + }, + { + "name": "string_col", + "type": "STRING" + }, + { + "name": "time_col", + "type": "TIME" + }, + { + "name": "timestamp_col", + "type": "TIMESTAMP" + }, + { + "name": "int64_list", + "type": "INTEGER", + "mode": "REPEATED" + }, + { + "name": "struct_col", + "type": "RECORD", + "fields": [ + {"name": "sub_int_col", "type": "INTEGER"} + ] + }, + { + "name": "struct_list", + "type": "RECORD", + "fields": [ + {"name": "sub_int_col", "type": "INTEGER"} + ], + "mode": "REPEATED" + } + ] diff --git a/samples/to_dataframe/requirements.txt b/samples/to_dataframe/requirements.txt index 85338b62..063a02b2 100644 --- a/samples/to_dataframe/requirements.txt +++ b/samples/to_dataframe/requirements.txt @@ -6,3 +6,4 @@ ipython==7.24.0; python_version > '3.6' ipython==7.16.1; python_version <= '3.6' pandas==1.2.5; python_version > '3.6' pandas==1.1.5; python_version <= '3.6' +tqdm==4.62.1 diff --git a/setup.py b/setup.py index 9775f76f..194d05a9 100644 --- a/setup.py +++ b/setup.py @@ -28,14 +28,15 @@ # Until this issue is closed # https://github.com/googleapis/google-cloud-python/issues/10566 "google-api-core[grpc] >= 1.26.0, <3.0.0dev", - "proto-plus >= 1.4.0", + "proto-plus >= 1.18.0", "packaging >= 14.3", "libcst >= 0.2.5", ] extras = { - "pandas": "pandas>=0.21.1", - "fastavro": "fastavro>=0.21.2", - "pyarrow": "pyarrow>=0.15.0", + "pandas": ["pandas>=0.21.1"], + "fastavro": ["fastavro>=0.21.2"], + "pyarrow": ["pyarrow>=0.15.0"], + "tests": ["freezegun"], } package_root = os.path.abspath(os.path.dirname(__file__)) diff --git a/testing/constraints-3.6.txt b/testing/constraints-3.6.txt index f9186709..eb1840b7 100644 --- a/testing/constraints-3.6.txt +++ b/testing/constraints-3.6.txt @@ -6,7 +6,7 @@ # e.g., if setup.py has "foo >= 1.14.0, < 2.0.0dev", # Then this file should have foo==1.14.0 google-api-core==1.26.0 -proto-plus==1.4.0 +proto-plus==1.18.0 libcst==0.2.5 fastavro==0.21.2 pandas==0.21.1 diff --git a/tests/system/conftest.py b/tests/system/conftest.py index 3a89097a..800da736 100644 --- a/tests/system/conftest.py +++ b/tests/system/conftest.py @@ -108,27 +108,6 @@ def dataset(project_id, bq_client): bq_client.delete_dataset(dataset, delete_contents=True) -@pytest.fixture -def table(project_id, dataset, bq_client): - from google.cloud import bigquery - - schema = [ - bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"), - bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), - bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), - ] - - unique_suffix = str(uuid.uuid4()).replace("-", "_") - table_id = "users_" + unique_suffix - table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}" - bq_table = bigquery.Table(table_id_full, schema=schema) - created_table = bq_client.create_table(bq_table) - - yield created_table - - bq_client.delete_table(created_table) - - @pytest.fixture(scope="session") def bq_client(credentials, use_mtls): if use_mtls: diff --git a/tests/system/reader/test_reader.py b/tests/system/reader/test_reader.py index aae3f6ec..24905c59 100644 --- a/tests/system/reader/test_reader.py +++ b/tests/system/reader/test_reader.py @@ -19,6 +19,7 @@ import datetime as dt import decimal import re +import uuid from google.cloud import bigquery import pytest @@ -30,6 +31,27 @@ _TABLE_FORMAT = "projects/{}/datasets/{}/tables/{}" +@pytest.fixture +def table(project_id, dataset, bq_client): + from google.cloud import bigquery + + schema = [ + bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"), + bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"), + ] + + unique_suffix = str(uuid.uuid4()).replace("-", "_") + table_id = "users_" + unique_suffix + table_id_full = f"{project_id}.{dataset.dataset_id}.{table_id}" + bq_table = bigquery.Table(table_id_full, schema=schema) + created_table = bq_client.create_table(bq_table) + + yield created_table + + bq_client.delete_table(created_table) + + def _to_bq_table_ref(table_name_string, partition_suffix=""): """Converts protobuf table reference to bigquery table reference. diff --git a/tests/system/test_writer.py b/tests/system/test_writer.py new file mode 100644 index 00000000..a8d119d0 --- /dev/null +++ b/tests/system/test_writer.py @@ -0,0 +1,33 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from google.api_core import exceptions +import pytest + +from google.cloud.bigquery_storage_v1beta2 import types as gapic_types + + +@pytest.fixture(scope="session") +def bqstorage_write_client(credentials): + from google.cloud import bigquery_storage_v1beta2 + + return bigquery_storage_v1beta2.BigQueryWriteClient(credentials=credentials) + + +def test_append_rows_with_invalid_stream_name_fails_fast(bqstorage_write_client): + bad_request = gapic_types.AppendRowsRequest() + bad_request.write_stream = "this-is-an-invalid-stream-resource-path" + + with pytest.raises(exceptions.GoogleAPICallError): + bqstorage_write_client.append_rows(bad_request) diff --git a/tests/unit/test_writer_v1beta2.py b/tests/unit/test_writer_v1beta2.py new file mode 100644 index 00000000..7da7c66a --- /dev/null +++ b/tests/unit/test_writer_v1beta2.py @@ -0,0 +1,147 @@ +# Copyright 2021 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from unittest import mock + +import freezegun +import pytest + +from google.api_core import exceptions +from google.cloud.bigquery_storage_v1beta2.services import big_query_write +from google.cloud.bigquery_storage_v1beta2 import types as gapic_types +from google.protobuf import descriptor_pb2 + + +REQUEST_TEMPLATE = gapic_types.AppendRowsRequest() + + +@pytest.fixture(scope="module") +def module_under_test(): + from google.cloud.bigquery_storage_v1beta2 import writer + + return writer + + +def test_constructor_and_default_state(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + + # Public state + assert manager.is_active is False + + # Private state + assert manager._client is mock_client + + +@mock.patch("google.api_core.bidi.BidiRpc", autospec=True) +@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) +def test_open(background_consumer, bidi_rpc, module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + request_template = gapic_types.AppendRowsRequest( + write_stream="stream-name-from-REQUEST_TEMPLATE", + offset=0, + proto_rows=gapic_types.AppendRowsRequest.ProtoData( + writer_schema=gapic_types.ProtoSchema( + proto_descriptor=descriptor_pb2.DescriptorProto() + ) + ), + ) + manager = module_under_test.AppendRowsStream(mock_client, request_template) + type(bidi_rpc.return_value).is_active = mock.PropertyMock( + return_value=(False, True) + ) + proto_rows = gapic_types.ProtoRows() + proto_rows.serialized_rows.append(b"hello, world") + initial_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path", + offset=42, + proto_rows=gapic_types.AppendRowsRequest.ProtoData(rows=proto_rows), + ) + + future = manager.open(initial_request) + + assert isinstance(future, module_under_test.AppendRowsFuture) + background_consumer.assert_called_once_with(manager._rpc, manager._on_response) + background_consumer.return_value.start.assert_called_once() + assert manager._consumer == background_consumer.return_value + + # Make sure the request template and the first request are merged as + # expected. Needs to be especially careful that nested properties such as + # writer_schema and rows are merged as expected. + expected_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path", + offset=42, + proto_rows=gapic_types.AppendRowsRequest.ProtoData( + writer_schema=gapic_types.ProtoSchema( + proto_descriptor=descriptor_pb2.DescriptorProto() + ), + rows=proto_rows, + ), + ) + bidi_rpc.assert_called_once_with( + start_rpc=mock_client.append_rows, + initial_request=expected_request, + # Extra header is required to route requests to the correct location. + metadata=( + ("x-goog-request-params", "write_stream=this-is-a-stream-resource-path"), + ), + ) + + bidi_rpc.return_value.add_done_callback.assert_called_once_with( + manager._on_rpc_done + ) + assert manager._rpc == bidi_rpc.return_value + + manager._consumer.is_active = True + assert manager.is_active is True + + +@mock.patch("google.api_core.bidi.BidiRpc", autospec=True) +@mock.patch("google.api_core.bidi.BackgroundConsumer", autospec=True) +def test_open_with_timeout(background_consumer, bidi_rpc, module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + type(bidi_rpc.return_value).is_active = mock.PropertyMock(return_value=False) + type(background_consumer.return_value).is_active = mock.PropertyMock( + return_value=False + ) + initial_request = gapic_types.AppendRowsRequest( + write_stream="this-is-a-stream-resource-path" + ) + + with pytest.raises(exceptions.Unknown), freezegun.freeze_time(auto_tick_seconds=1): + manager.open(initial_request, timeout=0.5) + + +def test_future_done_false(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + assert not future.done() + + +def test_future_done_true_with_result(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + future.set_result(object()) + assert future.done() + + +def test_future_done_true_with_exception(module_under_test): + mock_client = mock.create_autospec(big_query_write.BigQueryWriteClient) + manager = module_under_test.AppendRowsStream(mock_client, REQUEST_TEMPLATE) + future = module_under_test.AppendRowsFuture(manager) + future.set_exception(ValueError()) + assert future.done()