diff --git a/google/api_core/gapic_v1/__init__.py b/google/api_core/gapic_v1/__init__.py index e47d2cb5..ed95da13 100644 --- a/google/api_core/gapic_v1/__init__.py +++ b/google/api_core/gapic_v1/__init__.py @@ -23,4 +23,6 @@ if sys.version_info >= (3, 6): from google.api_core.gapic_v1 import config_async # noqa: F401 + from google.api_core.gapic_v1 import method_async # noqa: F401 __all__.append("config_async") + __all__.append("method_async") diff --git a/google/api_core/gapic_v1/method_async.py b/google/api_core/gapic_v1/method_async.py new file mode 100644 index 00000000..5210b2b7 --- /dev/null +++ b/google/api_core/gapic_v1/method_async.py @@ -0,0 +1,45 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""AsyncIO helpers for wrapping gRPC methods with common functionality. + +This is used by gapic clients to provide common error mapping, retry, timeout, +pagination, and long-running operations to gRPC methods. +""" + +from google.api_core import general_helpers, grpc_helpers_async +from google.api_core.gapic_v1 import client_info +from google.api_core.gapic_v1.method import (_GapicCallable, # noqa: F401 + DEFAULT, + USE_DEFAULT_METADATA) + + +def wrap_method( + func, + default_retry=None, + default_timeout=None, + client_info=client_info.DEFAULT_CLIENT_INFO, +): + """Wrap an async RPC method with common behavior. + + Returns: + Callable: A new callable that takes optional ``retry`` and ``timeout`` + arguments and applies the common error mapping, retry, timeout, + and metadata behavior to the low-level RPC method. + """ + func = grpc_helpers_async.wrap_errors(func) + + metadata = [client_info.to_grpc_metadata()] if client_info is not None else None + + return general_helpers.wraps(func)(_GapicCallable( + func, default_retry, default_timeout, metadata=metadata)) diff --git a/google/api_core/grpc_helpers.py b/google/api_core/grpc_helpers.py index c47b09fd..fde6c337 100644 --- a/google/api_core/grpc_helpers.py +++ b/google/api_core/grpc_helpers.py @@ -170,13 +170,10 @@ def wrap_errors(callable_): return _wrap_unary_errors(callable_) -def create_channel( - target, credentials=None, scopes=None, ssl_credentials=None, **kwargs -): - """Create a secure channel with credentials. +def _create_composite_credentials(credentials=None, scopes=None, ssl_credentials=None): + """Create the composite credentials for secure channels. Args: - target (str): The target service address in the format 'hostname:port'. credentials (google.auth.credentials.Credentials): The credentials. If not specified, then this function will attempt to ascertain the credentials from the environment using :func:`google.auth.default`. @@ -185,11 +182,9 @@ def create_channel( are passed to :func:`google.auth.default`. ssl_credentials (grpc.ChannelCredentials): Optional SSL channel credentials. This can be used to specify different certificates. - kwargs: Additional key-word args passed to - :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`. Returns: - grpc.Channel: The created channel. + grpc.ChannelCredentials: The composed channel credentials object. """ if credentials is None: credentials, _ = google.auth.default(scopes=scopes) @@ -212,10 +207,34 @@ def create_channel( ssl_credentials = grpc.ssl_channel_credentials() # Combine the ssl credentials and the authorization credentials. - composite_credentials = grpc.composite_channel_credentials( + return grpc.composite_channel_credentials( ssl_credentials, google_auth_credentials ) + +def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs): + """Create a secure channel with credentials. + + Args: + target (str): The target service address in the format 'hostname:port'. + credentials (google.auth.credentials.Credentials): The credentials. If + not specified, then this function will attempt to ascertain the + credentials from the environment using :func:`google.auth.default`. + scopes (Sequence[str]): A optional list of scopes needed for this + service. These are only used when credentials are not specified and + are passed to :func:`google.auth.default`. + ssl_credentials (grpc.ChannelCredentials): Optional SSL channel + credentials. This can be used to specify different certificates. + kwargs: Additional key-word args passed to + :func:`grpc_gcp.secure_channel` or :func:`grpc.secure_channel`. + + Returns: + grpc.Channel: The created channel. + """ + composite_credentials = _create_composite_credentials( + credentials, scopes, ssl_credentials + ) + if HAS_GRPC_GCP: # If grpc_gcp module is available use grpc_gcp.secure_channel, # otherwise, use grpc.secure_channel to create grpc channel. diff --git a/google/api_core/grpc_helpers_async.py b/google/api_core/grpc_helpers_async.py new file mode 100644 index 00000000..9ded803c --- /dev/null +++ b/google/api_core/grpc_helpers_async.py @@ -0,0 +1,270 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""AsyncIO helpers for :mod:`grpc` supporting 3.6+. + +Please combine more detailed docstring in grpc_helpers.py to use following +functions. This module is implementing the same surface with AsyncIO semantics. +""" + +import asyncio +import functools + +import grpc +from grpc.experimental import aio + +from google.api_core import exceptions, grpc_helpers + + +# TODO(lidiz) Support gRPC GCP wrapper +HAS_GRPC_GCP = False + +# NOTE(lidiz) Alternatively, we can hack "__getattribute__" to perform +# automatic patching for us. But that means the overhead of creating an +# extra Python function spreads to every single send and receive. + + +class _WrappedCall(aio.Call): + + def __init__(self): + self._call = None + + def with_call(self, call): + """Supplies the call object separately to keep __init__ clean.""" + self._call = call + return self + + async def initial_metadata(self): + return await self._call.initial_metadata() + + async def trailing_metadata(self): + return await self._call.trailing_metadata() + + async def code(self): + return await self._call.code() + + async def details(self): + return await self._call.details() + + def cancelled(self): + return self._call.cancelled() + + def done(self): + return self._call.done() + + def time_remaining(self): + return self._call.time_remaining() + + def cancel(self): + return self._call.cancel() + + def add_done_callback(self, callback): + self._call.add_done_callback(callback) + + async def wait_for_connection(self): + try: + await self._call.wait_for_connection() + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + +class _WrappedUnaryResponseMixin(_WrappedCall): + + def __await__(self): + try: + response = yield from self._call.__await__() + return response + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + +class _WrappedStreamResponseMixin(_WrappedCall): + + def __init__(self): + self._wrapped_async_generator = None + + async def read(self): + try: + return await self._call.read() + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + async def _wrapped_aiter(self): + try: + # NOTE(lidiz) coverage doesn't understand the exception raised from + # __anext__ method. It is covered by test case: + # test_wrap_stream_errors_aiter_non_rpc_error + async for response in self._call: # pragma: no branch + yield response + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + def __aiter__(self): + if not self._wrapped_async_generator: + self._wrapped_async_generator = self._wrapped_aiter() + return self._wrapped_async_generator + + +class _WrappedStreamRequestMixin(_WrappedCall): + + async def write(self, request): + try: + await self._call.write(request) + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + async def done_writing(self): + try: + await self._call.done_writing() + except grpc.RpcError as rpc_error: + raise exceptions.from_grpc_error(rpc_error) from rpc_error + + +# NOTE(lidiz) Implementing each individual class separately, so we don't +# expose any API that should not be seen. E.g., __aiter__ in unary-unary +# RPC, or __await__ in stream-stream RPC. +class _WrappedUnaryUnaryCall(_WrappedUnaryResponseMixin, aio.UnaryUnaryCall): + """Wrapped UnaryUnaryCall to map exceptions.""" + + +class _WrappedUnaryStreamCall(_WrappedStreamResponseMixin, aio.UnaryStreamCall): + """Wrapped UnaryStreamCall to map exceptions.""" + + +class _WrappedStreamUnaryCall(_WrappedUnaryResponseMixin, _WrappedStreamRequestMixin, aio.StreamUnaryCall): + """Wrapped StreamUnaryCall to map exceptions.""" + + +class _WrappedStreamStreamCall(_WrappedStreamRequestMixin, _WrappedStreamResponseMixin, aio.StreamStreamCall): + """Wrapped StreamStreamCall to map exceptions.""" + + +def _wrap_unary_errors(callable_): + """Map errors for Unary-Unary async callables.""" + grpc_helpers._patch_callable_name(callable_) + + @functools.wraps(callable_) + def error_remapped_callable(*args, **kwargs): + call = callable_(*args, **kwargs) + return _WrappedUnaryUnaryCall().with_call(call) + + return error_remapped_callable + + +def _wrap_stream_errors(callable_): + """Map errors for streaming RPC async callables.""" + grpc_helpers._patch_callable_name(callable_) + + @functools.wraps(callable_) + async def error_remapped_callable(*args, **kwargs): + call = callable_(*args, **kwargs) + + if isinstance(call, aio.UnaryStreamCall): + call = _WrappedUnaryStreamCall().with_call(call) + elif isinstance(call, aio.StreamUnaryCall): + call = _WrappedStreamUnaryCall().with_call(call) + elif isinstance(call, aio.StreamStreamCall): + call = _WrappedStreamStreamCall().with_call(call) + else: + raise TypeError('Unexpected type of call %s' % type(call)) + + await call.wait_for_connection() + return call + + return error_remapped_callable + + +def wrap_errors(callable_): + """Wrap a gRPC async callable and map :class:`grpc.RpcErrors` to + friendly error classes. + + Errors raised by the gRPC callable are mapped to the appropriate + :class:`google.api_core.exceptions.GoogleAPICallError` subclasses. The + original `grpc.RpcError` (which is usually also a `grpc.Call`) is + available from the ``response`` property on the mapped exception. This + is useful for extracting metadata from the original error. + + Args: + callable_ (Callable): A gRPC callable. + + Returns: Callable: The wrapped gRPC callable. + """ + if isinstance(callable_, aio.UnaryUnaryMultiCallable): + return _wrap_unary_errors(callable_) + else: + return _wrap_stream_errors(callable_) + + +def create_channel(target, credentials=None, scopes=None, ssl_credentials=None, **kwargs): + """Create an AsyncIO secure channel with credentials. + + Args: + target (str): The target service address in the format 'hostname:port'. + credentials (google.auth.credentials.Credentials): The credentials. If + not specified, then this function will attempt to ascertain the + credentials from the environment using :func:`google.auth.default`. + scopes (Sequence[str]): A optional list of scopes needed for this + service. These are only used when credentials are not specified and + are passed to :func:`google.auth.default`. + ssl_credentials (grpc.ChannelCredentials): Optional SSL channel + credentials. This can be used to specify different certificates. + kwargs: Additional key-word args passed to :func:`aio.secure_channel`. + + Returns: + aio.Channel: The created channel. + """ + composite_credentials = grpc_helpers._create_composite_credentials( + credentials, scopes, ssl_credentials + ) + + return aio.secure_channel(target, composite_credentials, **kwargs) + + +class FakeUnaryUnaryCall(_WrappedUnaryUnaryCall): + """Fake implementation for unary-unary RPCs. + + It is a dummy object for response message. Supply the intended response + upon the initialization, and the coroutine will return the exact response + message. + """ + + def __init__(self, response=object()): + self.response = response + self._future = asyncio.get_event_loop().create_future() + self._future.set_result(self.response) + + def __await__(self): + response = yield from self._future.__await__() + return response + + +class FakeStreamUnaryCall(_WrappedStreamUnaryCall): + """Fake implementation for stream-unary RPCs. + + It is a dummy object for response message. Supply the intended response + upon the initialization, and the coroutine will return the exact response + message. + """ + + def __init__(self, response=object()): + self.response = response + self._future = asyncio.get_event_loop().create_future() + self._future.set_result(self.response) + + def __await__(self): + response = yield from self._future.__await__() + return response + + async def wait_for_connection(self): + pass diff --git a/google/api_core/operations_v1/__init__.py b/google/api_core/operations_v1/__init__.py index f0549561..bc9befcb 100644 --- a/google/api_core/operations_v1/__init__.py +++ b/google/api_core/operations_v1/__init__.py @@ -14,6 +14,11 @@ """Package for interacting with the google.longrunning.operations meta-API.""" +import sys + from google.api_core.operations_v1.operations_client import OperationsClient __all__ = ["OperationsClient"] +if sys.version_info >= (3, 6, 0): + from google.api_core.operations_v1.operations_async_client import OperationsAsyncClient # noqa: F401 + __all__.append("OperationsAsyncClient") diff --git a/google/api_core/operations_v1/operations_async_client.py b/google/api_core/operations_v1/operations_async_client.py new file mode 100644 index 00000000..039bec1b --- /dev/null +++ b/google/api_core/operations_v1/operations_async_client.py @@ -0,0 +1,274 @@ +# Copyright 2020 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""An async client for the google.longrunning.operations meta-API. + +.. _Google API Style Guide: + https://cloud.google.com/apis/design/design_pattern + s#long_running_operations +.. _google/longrunning/operations.proto: + https://github.com/googleapis/googleapis/blob/master/google/longrunning + /operations.proto +""" + +import functools + +from google.api_core import gapic_v1, page_iterator_async +from google.api_core.operations_v1 import operations_client_config +from google.longrunning import operations_pb2 + + +class OperationsAsyncClient: + """Async client for interacting with long-running operations. + + Args: + channel (aio.Channel): The gRPC AsyncIO channel associated with the + service that implements the ``google.longrunning.operations`` + interface. + client_config (dict): + A dictionary of call options for each method. If not specified + the default configuration is used. + """ + + def __init__(self, channel, client_config=operations_client_config.config): + # Create the gRPC client stub with gRPC AsyncIO channel. + self.operations_stub = operations_pb2.OperationsStub(channel) + + # Create all wrapped methods using the interface configuration. + # The interface config contains all of the default settings for retry + # and timeout for each RPC method. + interfaces = client_config["interfaces"] + interface_config = interfaces["google.longrunning.Operations"] + method_configs = gapic_v1.config_async.parse_method_configs(interface_config) + + self._get_operation = gapic_v1.method_async.wrap_method( + self.operations_stub.GetOperation, + default_retry=method_configs["GetOperation"].retry, + default_timeout=method_configs["GetOperation"].timeout, + ) + + self._list_operations = gapic_v1.method_async.wrap_method( + self.operations_stub.ListOperations, + default_retry=method_configs["ListOperations"].retry, + default_timeout=method_configs["ListOperations"].timeout, + ) + + self._cancel_operation = gapic_v1.method_async.wrap_method( + self.operations_stub.CancelOperation, + default_retry=method_configs["CancelOperation"].retry, + default_timeout=method_configs["CancelOperation"].timeout, + ) + + self._delete_operation = gapic_v1.method_async.wrap_method( + self.operations_stub.DeleteOperation, + default_retry=method_configs["DeleteOperation"].retry, + default_timeout=method_configs["DeleteOperation"].timeout, + ) + + async def get_operation( + self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + ): + """Gets the latest state of a long-running operation. + + Clients can use this method to poll the operation result at intervals + as recommended by the API service. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> response = await api.get_operation(name) + + Args: + name (str): The name of the operation resource. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Returns: + google.longrunning.operations_pb2.Operation: The state of the + operation. + + Raises: + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + request = operations_pb2.GetOperationRequest(name=name) + return await self._get_operation(request, retry=retry, timeout=timeout) + + async def list_operations( + self, + name, + filter_, + retry=gapic_v1.method_async.DEFAULT, + timeout=gapic_v1.method_async.DEFAULT, + ): + """ + Lists operations that match the specified filter in the request. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> + >>> # Iterate over all results + >>> for operation in await api.list_operations(name): + >>> # process operation + >>> pass + >>> + >>> # Or iterate over results one page at a time + >>> iter = await api.list_operations(name) + >>> for page in iter.pages: + >>> for operation in page: + >>> # process operation + >>> pass + + Args: + name (str): The name of the operation collection. + filter_ (str): The standard list filter. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Returns: + google.api_core.page_iterator.Iterator: An iterator that yields + :class:`google.longrunning.operations_pb2.Operation` instances. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.ListOperationsRequest(name=name, filter=filter_) + + # Create the method used to fetch pages + method = functools.partial(self._list_operations, retry=retry, timeout=timeout) + + iterator = page_iterator_async.AsyncGRPCIterator( + client=None, + method=method, + request=request, + items_field="operations", + request_token_field="page_token", + response_token_field="next_page_token", + ) + + return iterator + + async def cancel_operation( + self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + ): + """Starts asynchronous cancellation on a long-running operation. + + The server makes a best effort to cancel the operation, but success is + not guaranteed. Clients can use :meth:`get_operation` or service- + specific methods to check whether the cancellation succeeded or whether + the operation completed despite cancellation. On successful + cancellation, the operation is not deleted; instead, it becomes an + operation with an ``Operation.error`` value with a + ``google.rpc.Status.code`` of ``1``, corresponding to + ``Code.CANCELLED``. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> api.cancel_operation(name) + + Args: + name (str): The name of the operation resource to be cancelled. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.CancelOperationRequest(name=name) + await self._cancel_operation(request, retry=retry, timeout=timeout) + + async def delete_operation( + self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + ): + """Deletes a long-running operation. + + This method indicates that the client is no longer interested in the + operation result. It does not cancel the operation. + + Example: + >>> from google.api_core import operations_v1 + >>> api = operations_v1.OperationsClient() + >>> name = '' + >>> api.delete_operation(name) + + Args: + name (str): The name of the operation resource to be deleted. + retry (google.api_core.retry.Retry): The retry strategy to use + when invoking the RPC. If unspecified, the default retry from + the client configuration will be used. If ``None``, then this + method will not retry the RPC at all. + timeout (float): The amount of time in seconds to wait for the RPC + to complete. Note that if ``retry`` is used, this timeout + applies to each individual attempt and the overall time it + takes for this method to complete may be longer. If + unspecified, the the default timeout in the client + configuration is used. If ``None``, then the RPC method will + not time out. + + Raises: + google.api_core.exceptions.MethodNotImplemented: If the server + does not support this method. Services are not required to + implement this method. + google.api_core.exceptions.GoogleAPICallError: If an error occurred + while invoking the RPC, the appropriate ``GoogleAPICallError`` + subclass will be raised. + """ + # Create the request object. + request = operations_pb2.DeleteOperationRequest(name=name) + await self._delete_operation(request, retry=retry, timeout=timeout) diff --git a/tests/asyncio/gapic/test_method_async.py b/tests/asyncio/gapic/test_method_async.py new file mode 100644 index 00000000..7318362b --- /dev/null +++ b/tests/asyncio/gapic/test_method_async.py @@ -0,0 +1,243 @@ +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime + +from grpc.experimental import aio +import mock +import pytest + +from google.api_core import (exceptions, gapic_v1, grpc_helpers_async, + retry_async, timeout) + + +def _utcnow_monotonic(): + current_time = datetime.datetime.min + delta = datetime.timedelta(seconds=0.5) + while True: + yield current_time + current_time += delta + + +@pytest.mark.asyncio +async def test_wrap_method_basic(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method(method) + + result = await wrapped_method(1, 2, meep="moop") + + assert result == 42 + method.assert_called_once_with(1, 2, meep="moop", metadata=mock.ANY) + + # Check that the default client info was specified in the metadata. + metadata = method.call_args[1]["metadata"] + assert len(metadata) == 1 + client_info = gapic_v1.client_info.DEFAULT_CLIENT_INFO + user_agent_metadata = client_info.to_grpc_metadata() + assert user_agent_metadata in metadata + + +@pytest.mark.asyncio +async def test_wrap_method_with_no_client_info(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method( + method, client_info=None + ) + + await wrapped_method(1, 2, meep="moop") + + method.assert_called_once_with(1, 2, meep="moop") + + +@pytest.mark.asyncio +async def test_wrap_method_with_custom_client_info(): + client_info = gapic_v1.client_info.ClientInfo( + python_version=1, + grpc_version=2, + api_core_version=3, + gapic_version=4, + client_library_version=5, + ) + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method( + method, client_info=client_info + ) + + await wrapped_method(1, 2, meep="moop") + + method.assert_called_once_with(1, 2, meep="moop", metadata=mock.ANY) + + # Check that the custom client info was specified in the metadata. + metadata = method.call_args[1]["metadata"] + assert client_info.to_grpc_metadata() in metadata + + +@pytest.mark.asyncio +async def test_invoke_wrapped_method_with_metadata(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method(method) + + await wrapped_method(mock.sentinel.request, metadata=[("a", "b")]) + + method.assert_called_once_with(mock.sentinel.request, metadata=mock.ANY) + metadata = method.call_args[1]["metadata"] + # Metadata should have two items: the client info metadata and our custom + # metadata. + assert len(metadata) == 2 + assert ("a", "b") in metadata + + +@pytest.mark.asyncio +async def test_invoke_wrapped_method_with_metadata_as_none(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall() + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + + wrapped_method = gapic_v1.method_async.wrap_method(method) + + await wrapped_method(mock.sentinel.request, metadata=None) + + method.assert_called_once_with(mock.sentinel.request, metadata=mock.ANY) + metadata = method.call_args[1]["metadata"] + # Metadata should have just one items: the client info metadata. + assert len(metadata) == 1 + + +@mock.patch("asyncio.sleep") +@pytest.mark.asyncio +async def test_wrap_method_with_default_retry_and_timeout(unused_sleep): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[ + exceptions.InternalServerError(None), + fake_call, + ]) + + default_retry = retry_async.AsyncRetry() + default_timeout = timeout.ConstantTimeout(60) + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_retry, default_timeout + ) + + result = await wrapped_method() + + assert result == 42 + assert method.call_count == 2 + method.assert_called_with(timeout=60, metadata=mock.ANY) + + +@mock.patch("asyncio.sleep") +@pytest.mark.asyncio +async def test_wrap_method_with_default_retry_and_timeout_using_sentinel(unused_sleep): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[ + exceptions.InternalServerError(None), + fake_call, + ]) + + default_retry = retry_async.AsyncRetry() + default_timeout = timeout.ConstantTimeout(60) + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_retry, default_timeout + ) + + result = await wrapped_method( + retry=gapic_v1.method_async.DEFAULT, + timeout=gapic_v1.method_async.DEFAULT, + ) + + assert result == 42 + assert method.call_count == 2 + method.assert_called_with(timeout=60, metadata=mock.ANY) + + +@mock.patch("asyncio.sleep") +@pytest.mark.asyncio +async def test_wrap_method_with_overriding_retry_and_timeout(unused_sleep): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, side_effect=[ + exceptions.NotFound(None), + fake_call, + ]) + + default_retry = retry_async.AsyncRetry() + default_timeout = timeout.ConstantTimeout(60) + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_retry, default_timeout + ) + + result = await wrapped_method( + retry=retry_async.AsyncRetry(retry_async.if_exception_type(exceptions.NotFound)), + timeout=timeout.ConstantTimeout(22), + ) + + assert result == 42 + assert method.call_count == 2 + method.assert_called_with(timeout=22, metadata=mock.ANY) + + +@mock.patch("asyncio.sleep") +@mock.patch( + "google.api_core.datetime_helpers.utcnow", + side_effect=_utcnow_monotonic(), + autospec=True, +) +@pytest.mark.asyncio +async def test_wrap_method_with_overriding_retry_deadline(utcnow, unused_sleep): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock( + spec=aio.UnaryUnaryMultiCallable, + side_effect=([exceptions.InternalServerError(None)] * 4) + [fake_call]) + + default_retry = retry_async.AsyncRetry() + default_timeout = timeout.ExponentialTimeout(deadline=60) + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_retry, default_timeout + ) + + # Overriding only the retry's deadline should also override the timeout's + # deadline. + result = await wrapped_method(retry=default_retry.with_deadline(30)) + + assert result == 42 + timeout_args = [call[1]["timeout"] for call in method.call_args_list] + assert timeout_args == [5.0, 10.0, 20.0, 26.0, 25.0] + assert utcnow.call_count == ( + 1 + + 1 # Compute wait_for timeout in retry_async + + 5 # First to set the deadline. + + 5 # One for each min(timeout, maximum, (DEADLINE - NOW).seconds) + ) + + +@pytest.mark.asyncio +async def test_wrap_method_with_overriding_timeout_as_a_number(): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(42) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + default_retry = retry_async.AsyncRetry() + default_timeout = timeout.ConstantTimeout(60) + wrapped_method = gapic_v1.method_async.wrap_method( + method, default_retry, default_timeout + ) + + result = await wrapped_method(timeout=22) + + assert result == 42 + method.assert_called_once_with(timeout=22, metadata=mock.ANY) diff --git a/tests/asyncio/operations_v1/__init__.py b/tests/asyncio/operations_v1/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/asyncio/operations_v1/test_operations_async_client.py b/tests/asyncio/operations_v1/test_operations_async_client.py new file mode 100644 index 00000000..0f9363ff --- /dev/null +++ b/tests/asyncio/operations_v1/test_operations_async_client.py @@ -0,0 +1,93 @@ +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from grpc.experimental import aio +import mock +import pytest + +from google.api_core import (grpc_helpers_async, operations_v1, + page_iterator_async) +from google.longrunning import operations_pb2 +from google.protobuf import empty_pb2 + + +def _mock_grpc_objects(response): + fake_call = grpc_helpers_async.FakeUnaryUnaryCall(response) + method = mock.Mock(spec=aio.UnaryUnaryMultiCallable, return_value=fake_call) + mocked_channel = mock.Mock() + mocked_channel.unary_unary = mock.Mock(return_value=method) + return mocked_channel, method, fake_call + + +@pytest.mark.asyncio +async def test_get_operation(): + mocked_channel, method, fake_call = _mock_grpc_objects( + operations_pb2.Operation(name="meep")) + client = operations_v1.OperationsAsyncClient(mocked_channel) + + response = await client.get_operation("name") + assert method.call_count == 1 + assert tuple(method.call_args_list[0])[0][0].name == "name" + assert response == fake_call.response + + +@pytest.mark.asyncio +async def test_list_operations(): + operations = [ + operations_pb2.Operation(name="1"), + operations_pb2.Operation(name="2"), + ] + list_response = operations_pb2.ListOperationsResponse(operations=operations) + + mocked_channel, method, fake_call = _mock_grpc_objects(list_response) + client = operations_v1.OperationsAsyncClient(mocked_channel) + + pager = await client.list_operations("name", "filter") + + assert isinstance(pager, page_iterator_async.AsyncIterator) + responses = [] + async for response in pager: + responses.append(response) + + assert responses == operations + + assert method.call_count == 1 + request = tuple(method.call_args_list[0])[0][0] + assert isinstance(request, operations_pb2.ListOperationsRequest) + assert request.name == "name" + assert request.filter == "filter" + + +@pytest.mark.asyncio +async def test_delete_operation(): + mocked_channel, method, fake_call = _mock_grpc_objects( + empty_pb2.Empty()) + client = operations_v1.OperationsAsyncClient(mocked_channel) + + await client.delete_operation("name") + + assert method.call_count == 1 + assert tuple(method.call_args_list[0])[0][0].name == "name" + + +@pytest.mark.asyncio +async def test_cancel_operation(): + mocked_channel, method, fake_call = _mock_grpc_objects( + empty_pb2.Empty()) + client = operations_v1.OperationsAsyncClient(mocked_channel) + + await client.cancel_operation("name") + + assert method.call_count == 1 + assert tuple(method.call_args_list[0])[0][0].name == "name" diff --git a/tests/asyncio/test_grpc_helpers_async.py b/tests/asyncio/test_grpc_helpers_async.py new file mode 100644 index 00000000..00539521 --- /dev/null +++ b/tests/asyncio/test_grpc_helpers_async.py @@ -0,0 +1,372 @@ +# Copyright 2017 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import grpc +from grpc.experimental import aio +import mock +import pytest + +from google.api_core import exceptions +from google.api_core import grpc_helpers_async +import google.auth.credentials + + +class RpcErrorImpl(grpc.RpcError, grpc.Call): + def __init__(self, code): + super(RpcErrorImpl, self).__init__() + self._code = code + + def code(self): + return self._code + + def details(self): + return None + + +@pytest.mark.asyncio +async def test_wrap_unary_errors(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + callable_ = mock.AsyncMock(spec=["__call__"], side_effect=grpc_error) + + wrapped_callable = grpc_helpers_async._wrap_unary_errors(callable_) + + with pytest.raises(exceptions.InvalidArgument) as exc_info: + await wrapped_callable(1, 2, three="four") + + callable_.assert_called_once_with(1, 2, three="four") + assert exc_info.value.response == grpc_error + + +@pytest.mark.asyncio +async def test_common_methods_in_wrapped_call(): + mock_call = mock.Mock(aio.UnaryUnaryCall, autospec=True) + wrapped_call = grpc_helpers_async._WrappedUnaryUnaryCall().with_call(mock_call) + + await wrapped_call.initial_metadata() + assert mock_call.initial_metadata.call_count == 1 + + await wrapped_call.trailing_metadata() + assert mock_call.trailing_metadata.call_count == 1 + + await wrapped_call.code() + assert mock_call.code.call_count == 1 + + await wrapped_call.details() + assert mock_call.details.call_count == 1 + + wrapped_call.cancelled() + assert mock_call.cancelled.call_count == 1 + + wrapped_call.done() + assert mock_call.done.call_count == 1 + + wrapped_call.time_remaining() + assert mock_call.time_remaining.call_count == 1 + + wrapped_call.cancel() + assert mock_call.cancel.call_count == 1 + + callback = mock.sentinel.callback + wrapped_call.add_done_callback(callback) + mock_call.add_done_callback.assert_called_once_with(callback) + + await wrapped_call.wait_for_connection() + assert mock_call.wait_for_connection.call_count == 1 + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_unary_stream(): + mock_call = mock.Mock(aio.UnaryStreamCall, autospec=True) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_stream_unary(): + mock_call = mock.Mock(aio.StreamUnaryCall, autospec=True) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_stream_stream(): + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_type_error(): + mock_call = mock.Mock() + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + with pytest.raises(TypeError): + await wrapped_callable() + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_raised(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + mock_call.wait_for_connection = mock.AsyncMock(side_effect=[grpc_error]) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + with pytest.raises(exceptions.InvalidArgument): + await wrapped_callable() + assert mock_call.wait_for_connection.call_count == 1 + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_read(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + mock_call.read = mock.AsyncMock(side_effect=grpc_error) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + wrapped_call = await wrapped_callable(1, 2, three="four") + multicallable.assert_called_once_with(1, 2, three="four") + assert mock_call.wait_for_connection.call_count == 1 + + with pytest.raises(exceptions.InvalidArgument) as exc_info: + await wrapped_call.read() + assert exc_info.value.response == grpc_error + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_aiter(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + mocked_aiter = mock.Mock(spec=['__anext__']) + mocked_aiter.__anext__ = mock.AsyncMock(side_effect=[mock.sentinel.response, grpc_error]) + mock_call.__aiter__ = mock.Mock(return_value=mocked_aiter) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + wrapped_call = await wrapped_callable() + + with pytest.raises(exceptions.InvalidArgument) as exc_info: + async for response in wrapped_call: + assert response == mock.sentinel.response + assert exc_info.value.response == grpc_error + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_aiter_non_rpc_error(): + non_grpc_error = TypeError('Not a gRPC error') + + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + mocked_aiter = mock.Mock(spec=['__anext__']) + mocked_aiter.__anext__ = mock.AsyncMock(side_effect=[mock.sentinel.response, non_grpc_error]) + mock_call.__aiter__ = mock.Mock(return_value=mocked_aiter) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + wrapped_call = await wrapped_callable() + + with pytest.raises(TypeError) as exc_info: + async for response in wrapped_call: + assert response == mock.sentinel.response + assert exc_info.value == non_grpc_error + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_aiter_called_multiple_times(): + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + wrapped_call = await wrapped_callable() + + assert wrapped_call.__aiter__() == wrapped_call.__aiter__() + + +@pytest.mark.asyncio +async def test_wrap_stream_errors_write(): + grpc_error = RpcErrorImpl(grpc.StatusCode.INVALID_ARGUMENT) + + mock_call = mock.Mock(aio.StreamStreamCall, autospec=True) + mock_call.write = mock.AsyncMock(side_effect=[None, grpc_error]) + mock_call.done_writing = mock.AsyncMock(side_effect=[None, grpc_error]) + multicallable = mock.Mock(return_value=mock_call) + + wrapped_callable = grpc_helpers_async._wrap_stream_errors(multicallable) + + wrapped_call = await wrapped_callable() + + await wrapped_call.write(mock.sentinel.request) + with pytest.raises(exceptions.InvalidArgument) as exc_info: + await wrapped_call.write(mock.sentinel.request) + assert mock_call.write.call_count == 2 + assert exc_info.value.response == grpc_error + + await wrapped_call.done_writing() + with pytest.raises(exceptions.InvalidArgument) as exc_info: + await wrapped_call.done_writing() + assert mock_call.done_writing.call_count == 2 + assert exc_info.value.response == grpc_error + + +@mock.patch("google.api_core.grpc_helpers_async._wrap_unary_errors") +def test_wrap_errors_non_streaming(wrap_unary_errors): + callable_ = mock.create_autospec(aio.UnaryUnaryMultiCallable) + + result = grpc_helpers_async.wrap_errors(callable_) + + assert result == wrap_unary_errors.return_value + wrap_unary_errors.assert_called_once_with(callable_) + + +@mock.patch("google.api_core.grpc_helpers_async._wrap_stream_errors") +def test_wrap_errors_streaming(wrap_stream_errors): + callable_ = mock.create_autospec(aio.UnaryStreamMultiCallable) + + result = grpc_helpers_async.wrap_errors(callable_) + + assert result == wrap_stream_errors.return_value + wrap_stream_errors.assert_called_once_with(callable_) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_implicit(grpc_secure_channel, default, composite_creds_call): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers_async.create_channel(target) + + assert channel is grpc_secure_channel.return_value + default.assert_called_once_with(scopes=None) + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_implicit_with_ssl_creds( + grpc_secure_channel, default, composite_creds_call +): + target = "example.com:443" + + ssl_creds = grpc.ssl_channel_credentials() + + grpc_helpers_async.create_channel(target, ssl_credentials=ssl_creds) + + default.assert_called_once_with(scopes=None) + composite_creds_call.assert_called_once_with(ssl_creds, mock.ANY) + composite_creds = composite_creds_call.return_value + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch( + "google.auth.default", + return_value=(mock.sentinel.credentials, mock.sentinel.projet), +) +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_implicit_with_scopes( + grpc_secure_channel, default, composite_creds_call +): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers_async.create_channel(target, scopes=["one", "two"]) + + assert channel is grpc_secure_channel.return_value + default.assert_called_once_with(scopes=["one", "two"]) + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("google.auth.credentials.with_scopes_if_required") +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_explicit(grpc_secure_channel, auth_creds, composite_creds_call): + target = "example.com:443" + composite_creds = composite_creds_call.return_value + + channel = grpc_helpers_async.create_channel(target, credentials=mock.sentinel.credentials) + + auth_creds.assert_called_once_with(mock.sentinel.credentials, None) + assert channel is grpc_secure_channel.return_value + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@mock.patch("grpc.composite_channel_credentials") +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_explicit_scoped(grpc_secure_channel, composite_creds_call): + target = "example.com:443" + scopes = ["1", "2"] + composite_creds = composite_creds_call.return_value + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + channel = grpc_helpers_async.create_channel( + target, credentials=credentials, scopes=scopes + ) + + credentials.with_scopes.assert_called_once_with(scopes) + assert channel is grpc_secure_channel.return_value + grpc_secure_channel.assert_called_once_with(target, composite_creds) + + +@pytest.mark.skipif(grpc_helpers_async.HAS_GRPC_GCP, reason="grpc_gcp module not available") +@mock.patch("grpc.experimental.aio.secure_channel") +def test_create_channel_without_grpc_gcp(grpc_secure_channel): + target = "example.com:443" + scopes = ["test_scope"] + + credentials = mock.create_autospec(google.auth.credentials.Scoped, instance=True) + credentials.requires_scopes = True + + grpc_helpers_async.create_channel(target, credentials=credentials, scopes=scopes) + grpc_secure_channel.assert_called() + credentials.with_scopes.assert_called_once_with(scopes) + + +@pytest.mark.asyncio +async def test_fake_stream_unary_call(): + fake_call = grpc_helpers_async.FakeStreamUnaryCall() + await fake_call.wait_for_connection() + response = await fake_call + assert fake_call.response == response