From 73854e897b885e9be290f2676a8a1466b4f041e4 Mon Sep 17 00:00:00 2001 From: Bu Sun Kim <8822365+busunkim96@users.noreply.github.com> Date: Thu, 14 Jan 2021 14:22:58 -0700 Subject: [PATCH] feat: allow gRPC metadata to be passed to operations client (#127) --- google/api_core/operation.py | 16 +++++---- google/api_core/operation_async.py | 8 +++-- .../operations_v1/operations_async_client.py | 35 +++++++++++++++---- .../operations_v1/operations_client.py | 35 +++++++++++++++---- .../test_operations_async_client.py | 12 ++++--- tests/asyncio/test_operation_async.py | 3 ++ .../operations_v1/test_operations_client.py | 12 ++++--- tests/unit/test_operation.py | 6 ++++ 8 files changed, 96 insertions(+), 31 deletions(-) diff --git a/google/api_core/operation.py b/google/api_core/operation.py index 9af9c4e6..a806523d 100644 --- a/google/api_core/operation.py +++ b/google/api_core/operation.py @@ -287,7 +287,7 @@ def _cancel_grpc(operations_stub, operation_name): operations_stub.CancelOperation(request_pb) -def from_grpc(operation, operations_stub, result_type, **kwargs): +def from_grpc(operation, operations_stub, result_type, grpc_metadata=None, **kwargs): """Create an operation future using a gRPC client. This interacts with the long-running operations `service`_ (specific @@ -302,18 +302,20 @@ def from_grpc(operation, operations_stub, result_type, **kwargs): operations_stub (google.longrunning.operations_pb2.OperationsStub): The operations stub. result_type (:func:`type`): The protobuf result type. + grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass + to the rpc. kwargs: Keyword args passed into the :class:`Operation` constructor. Returns: ~.api_core.operation.Operation: The operation future to track the given operation. """ - refresh = functools.partial(_refresh_grpc, operations_stub, operation.name) - cancel = functools.partial(_cancel_grpc, operations_stub, operation.name) + refresh = functools.partial(_refresh_grpc, operations_stub, operation.name, metadata=grpc_metadata) + cancel = functools.partial(_cancel_grpc, operations_stub, operation.name, metadata=grpc_metadata) return Operation(operation, refresh, cancel, result_type, **kwargs) -def from_gapic(operation, operations_client, result_type, **kwargs): +def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs): """Create an operation future from a gapic client. This interacts with the long-running operations `service`_ (specific @@ -328,12 +330,14 @@ def from_gapic(operation, operations_client, result_type, **kwargs): operations_client (google.api_core.operations_v1.OperationsClient): The operations client. result_type (:func:`type`): The protobuf result type. + grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass + to the rpc. kwargs: Keyword args passed into the :class:`Operation` constructor. Returns: ~.api_core.operation.Operation: The operation future to track the given operation. """ - refresh = functools.partial(operations_client.get_operation, operation.name) - cancel = functools.partial(operations_client.cancel_operation, operation.name) + refresh = functools.partial(operations_client.get_operation, operation.name, metadata=grpc_metadata) + cancel = functools.partial(operations_client.cancel_operation, operation.name, metadata=grpc_metadata) return Operation(operation, refresh, cancel, result_type, **kwargs) diff --git a/google/api_core/operation_async.py b/google/api_core/operation_async.py index 89500af1..b137235b 100644 --- a/google/api_core/operation_async.py +++ b/google/api_core/operation_async.py @@ -189,7 +189,7 @@ async def cancelled(self): ) -def from_gapic(operation, operations_client, result_type, **kwargs): +def from_gapic(operation, operations_client, result_type, grpc_metadata=None, **kwargs): """Create an operation future from a gapic client. This interacts with the long-running operations `service`_ (specific @@ -204,12 +204,14 @@ def from_gapic(operation, operations_client, result_type, **kwargs): operations_client (google.api_core.operations_v1.OperationsClient): The operations client. result_type (:func:`type`): The protobuf result type. + grpc_metadata (Optional[List[Tuple[str, str]]]): Additional metadata to pass + to the rpc. kwargs: Keyword args passed into the :class:`Operation` constructor. Returns: ~.api_core.operation.Operation: The operation future to track the given operation. """ - refresh = functools.partial(operations_client.get_operation, operation.name) - cancel = functools.partial(operations_client.cancel_operation, operation.name) + refresh = functools.partial(operations_client.get_operation, operation.name, metadata=grpc_metadata) + cancel = functools.partial(operations_client.cancel_operation, operation.name, metadata=grpc_metadata) return AsyncOperation(operation, refresh, cancel, result_type, **kwargs) diff --git a/google/api_core/operations_v1/operations_async_client.py b/google/api_core/operations_v1/operations_async_client.py index 039bec1b..5d7b26cf 100644 --- a/google/api_core/operations_v1/operations_async_client.py +++ b/google/api_core/operations_v1/operations_async_client.py @@ -77,7 +77,11 @@ def __init__(self, channel, client_config=operations_client_config.config): ) async def get_operation( - self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + self, + name, + retry=gapic_v1.method_async.DEFAULT, + timeout=gapic_v1.method_async.DEFAULT, + metadata=None, ): """Gets the latest state of a long-running operation. @@ -103,6 +107,8 @@ async def get_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): + Additional gRPC metadata. Returns: google.longrunning.operations_pb2.Operation: The state of the @@ -114,7 +120,7 @@ async def get_operation( subclass will be raised. """ request = operations_pb2.GetOperationRequest(name=name) - return await self._get_operation(request, retry=retry, timeout=timeout) + return await self._get_operation(request, retry=retry, timeout=timeout, metadata=metadata) async def list_operations( self, @@ -122,6 +128,7 @@ async def list_operations( filter_, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT, + metadata=None, ): """ Lists operations that match the specified filter in the request. @@ -157,6 +164,8 @@ async def list_operations( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. Returns: google.api_core.page_iterator.Iterator: An iterator that yields @@ -174,7 +183,7 @@ async def list_operations( request = operations_pb2.ListOperationsRequest(name=name, filter=filter_) # Create the method used to fetch pages - method = functools.partial(self._list_operations, retry=retry, timeout=timeout) + method = functools.partial(self._list_operations, retry=retry, timeout=timeout, metadata=metadata) iterator = page_iterator_async.AsyncGRPCIterator( client=None, @@ -188,7 +197,11 @@ async def list_operations( return iterator async def cancel_operation( - self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + self, + name, + retry=gapic_v1.method_async.DEFAULT, + timeout=gapic_v1.method_async.DEFAULT, + metadata=None, ): """Starts asynchronous cancellation on a long-running operation. @@ -228,13 +241,19 @@ async def cancel_operation( google.api_core.exceptions.GoogleAPICallError: If an error occurred while invoking the RPC, the appropriate ``GoogleAPICallError`` subclass will be raised. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. """ # Create the request object. request = operations_pb2.CancelOperationRequest(name=name) - await self._cancel_operation(request, retry=retry, timeout=timeout) + await self._cancel_operation(request, retry=retry, timeout=timeout, metadata=metadata) async def delete_operation( - self, name, retry=gapic_v1.method_async.DEFAULT, timeout=gapic_v1.method_async.DEFAULT + self, + name, + retry=gapic_v1.method_async.DEFAULT, + timeout=gapic_v1.method_async.DEFAULT, + metadata=None, ): """Deletes a long-running operation. @@ -260,6 +279,8 @@ async def delete_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. Raises: google.api_core.exceptions.MethodNotImplemented: If the server @@ -271,4 +292,4 @@ async def delete_operation( """ # Create the request object. request = operations_pb2.DeleteOperationRequest(name=name) - await self._delete_operation(request, retry=retry, timeout=timeout) + await self._delete_operation(request, retry=retry, timeout=timeout, metadata=metadata) diff --git a/google/api_core/operations_v1/operations_client.py b/google/api_core/operations_v1/operations_client.py index cd2923bb..b8507964 100644 --- a/google/api_core/operations_v1/operations_client.py +++ b/google/api_core/operations_v1/operations_client.py @@ -91,7 +91,11 @@ def __init__(self, channel, client_config=operations_client_config.config): # Service calls def get_operation( - self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + self, + name, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + metadata=None, ): """Gets the latest state of a long-running operation. @@ -117,6 +121,8 @@ def get_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): + Additional gRPC metadata. Returns: google.longrunning.operations_pb2.Operation: The state of the @@ -128,7 +134,7 @@ def get_operation( subclass will be raised. """ request = operations_pb2.GetOperationRequest(name=name) - return self._get_operation(request, retry=retry, timeout=timeout) + return self._get_operation(request, retry=retry, timeout=timeout, metadata=metadata) def list_operations( self, @@ -136,6 +142,7 @@ def list_operations( filter_, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT, + metadata=None, ): """ Lists operations that match the specified filter in the request. @@ -171,6 +178,8 @@ def list_operations( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. Returns: google.api_core.page_iterator.Iterator: An iterator that yields @@ -188,7 +197,7 @@ def list_operations( request = operations_pb2.ListOperationsRequest(name=name, filter=filter_) # Create the method used to fetch pages - method = functools.partial(self._list_operations, retry=retry, timeout=timeout) + method = functools.partial(self._list_operations, retry=retry, timeout=timeout, metadata=metadata) iterator = page_iterator.GRPCIterator( client=None, @@ -202,7 +211,11 @@ def list_operations( return iterator def cancel_operation( - self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + self, + name, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + metadata=None, ): """Starts asynchronous cancellation on a long-running operation. @@ -234,6 +247,8 @@ def cancel_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. Raises: google.api_core.exceptions.MethodNotImplemented: If the server @@ -245,10 +260,14 @@ def cancel_operation( """ # Create the request object. request = operations_pb2.CancelOperationRequest(name=name) - self._cancel_operation(request, retry=retry, timeout=timeout) + self._cancel_operation(request, retry=retry, timeout=timeout, metadata=metadata) def delete_operation( - self, name, retry=gapic_v1.method.DEFAULT, timeout=gapic_v1.method.DEFAULT + self, + name, + retry=gapic_v1.method.DEFAULT, + timeout=gapic_v1.method.DEFAULT, + metadata=None, ): """Deletes a long-running operation. @@ -274,6 +293,8 @@ def delete_operation( unspecified, the the default timeout in the client configuration is used. If ``None``, then the RPC method will not time out. + metadata (Optional[List[Tuple[str, str]]]): Additional gRPC + metadata. Raises: google.api_core.exceptions.MethodNotImplemented: If the server @@ -285,4 +306,4 @@ def delete_operation( """ # Create the request object. request = operations_pb2.DeleteOperationRequest(name=name) - self._delete_operation(request, retry=retry, timeout=timeout) + self._delete_operation(request, retry=retry, timeout=timeout, metadata=metadata) diff --git a/tests/asyncio/operations_v1/test_operations_async_client.py b/tests/asyncio/operations_v1/test_operations_async_client.py index 0f9363ff..830cd465 100644 --- a/tests/asyncio/operations_v1/test_operations_async_client.py +++ b/tests/asyncio/operations_v1/test_operations_async_client.py @@ -36,9 +36,10 @@ async def test_get_operation(): operations_pb2.Operation(name="meep")) client = operations_v1.OperationsAsyncClient(mocked_channel) - response = await client.get_operation("name") + response = await client.get_operation("name", metadata=[("x-goog-request-params", "foo")]) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" + assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"] assert response == fake_call.response @@ -53,7 +54,7 @@ async def test_list_operations(): mocked_channel, method, fake_call = _mock_grpc_objects(list_response) client = operations_v1.OperationsAsyncClient(mocked_channel) - pager = await client.list_operations("name", "filter") + pager = await client.list_operations("name", "filter", metadata=[("x-goog-request-params", "foo")]) assert isinstance(pager, page_iterator_async.AsyncIterator) responses = [] @@ -63,6 +64,7 @@ async def test_list_operations(): assert responses == operations assert method.call_count == 1 + assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"] request = tuple(method.call_args_list[0])[0][0] assert isinstance(request, operations_pb2.ListOperationsRequest) assert request.name == "name" @@ -75,10 +77,11 @@ async def test_delete_operation(): empty_pb2.Empty()) client = operations_v1.OperationsAsyncClient(mocked_channel) - await client.delete_operation("name") + await client.delete_operation("name", metadata=[("x-goog-request-params", "foo")]) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" + assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"] @pytest.mark.asyncio @@ -87,7 +90,8 @@ async def test_cancel_operation(): empty_pb2.Empty()) client = operations_v1.OperationsAsyncClient(mocked_channel) - await client.cancel_operation("name") + await client.cancel_operation("name", metadata=[("x-goog-request-params", "foo")]) assert method.call_count == 1 assert tuple(method.call_args_list[0])[0][0].name == "name" + assert ("x-goog-request-params", "foo") in tuple(method.call_args_list[0])[1]["metadata"] diff --git a/tests/asyncio/test_operation_async.py b/tests/asyncio/test_operation_async.py index 419749f3..e35d1396 100644 --- a/tests/asyncio/test_operation_async.py +++ b/tests/asyncio/test_operation_async.py @@ -177,12 +177,15 @@ def test_from_gapic(): operations_client, struct_pb2.Struct, metadata_type=struct_pb2.Struct, + grpc_metadata=[('x-goog-request-params', 'foo')] ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done + assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')] + assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')] def test_deserialize(): diff --git a/tests/unit/operations_v1/test_operations_client.py b/tests/unit/operations_v1/test_operations_client.py index cc574612..bd7f3736 100644 --- a/tests/unit/operations_v1/test_operations_client.py +++ b/tests/unit/operations_v1/test_operations_client.py @@ -24,8 +24,9 @@ def test_get_operation(): client = operations_v1.OperationsClient(channel) channel.GetOperation.response = operations_pb2.Operation(name="meep") - response = client.get_operation("name") + response = client.get_operation("name", metadata=[("x-goog-request-params", "foo")]) + assert ("x-goog-request-params", "foo") in channel.GetOperation.calls[0].metadata assert len(channel.GetOperation.requests) == 1 assert channel.GetOperation.requests[0].name == "name" assert response == channel.GetOperation.response @@ -41,11 +42,12 @@ def test_list_operations(): list_response = operations_pb2.ListOperationsResponse(operations=operations) channel.ListOperations.response = list_response - response = client.list_operations("name", "filter") + response = client.list_operations("name", "filter", metadata=[("x-goog-request-params", "foo")]) assert isinstance(response, page_iterator.Iterator) assert list(response) == operations + assert ("x-goog-request-params", "foo") in channel.ListOperations.calls[0].metadata assert len(channel.ListOperations.requests) == 1 request = channel.ListOperations.requests[0] assert isinstance(request, operations_pb2.ListOperationsRequest) @@ -58,8 +60,9 @@ def test_delete_operation(): client = operations_v1.OperationsClient(channel) channel.DeleteOperation.response = empty_pb2.Empty() - client.delete_operation("name") + client.delete_operation("name", metadata=[("x-goog-request-params", "foo")]) + assert ("x-goog-request-params", "foo") in channel.DeleteOperation.calls[0].metadata assert len(channel.DeleteOperation.requests) == 1 assert channel.DeleteOperation.requests[0].name == "name" @@ -69,7 +72,8 @@ def test_cancel_operation(): client = operations_v1.OperationsClient(channel) channel.CancelOperation.response = empty_pb2.Empty() - client.cancel_operation("name") + client.cancel_operation("name", metadata=[("x-goog-request-params", "foo")]) + assert ("x-goog-request-params", "foo") in channel.CancelOperation.calls[0].metadata assert len(channel.CancelOperation.requests) == 1 assert channel.CancelOperation.requests[0].name == "name" diff --git a/tests/unit/test_operation.py b/tests/unit/test_operation.py index 2229c2d4..ae9bafea 100644 --- a/tests/unit/test_operation.py +++ b/tests/unit/test_operation.py @@ -279,12 +279,15 @@ def test_from_grpc(): operations_stub, struct_pb2.Struct, metadata_type=struct_pb2.Struct, + grpc_metadata=[('x-goog-request-params', 'foo')] ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done + assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')] + assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')] def test_from_gapic(): @@ -298,12 +301,15 @@ def test_from_gapic(): operations_client, struct_pb2.Struct, metadata_type=struct_pb2.Struct, + grpc_metadata=[('x-goog-request-params', 'foo')] ) assert future._result_type == struct_pb2.Struct assert future._metadata_type == struct_pb2.Struct assert future.operation.name == TEST_OPERATION_NAME assert future.done + assert future._refresh.keywords["metadata"] == [('x-goog-request-params', 'foo')] + assert future._cancel.keywords["metadata"] == [('x-goog-request-params', 'foo')] def test_deserialize():