diff --git a/google/cloud/tpu_v1/services/tpu/async_client.py b/google/cloud/tpu_v1/services/tpu/async_client.py index cd928ad..3d2c92a 100644 --- a/google/cloud/tpu_v1/services/tpu/async_client.py +++ b/google/cloud/tpu_v1/services/tpu/async_client.py @@ -952,6 +952,12 @@ async def get_accelerator_type( # Done; return the response. return response + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc, tb): + await self.transport.close() + try: DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( diff --git a/google/cloud/tpu_v1/services/tpu/client.py b/google/cloud/tpu_v1/services/tpu/client.py index 38f4715..b1c3fec 100644 --- a/google/cloud/tpu_v1/services/tpu/client.py +++ b/google/cloud/tpu_v1/services/tpu/client.py @@ -383,10 +383,7 @@ def __init__( client_cert_source_for_mtls=client_cert_source_func, quota_project_id=client_options.quota_project_id, client_info=client_info, - always_use_jwt_access=( - Transport == type(self).get_transport_class("grpc") - or Transport == type(self).get_transport_class("grpc_asyncio") - ), + always_use_jwt_access=True, ) def list_nodes( @@ -1184,6 +1181,19 @@ def get_accelerator_type( # Done; return the response. return response + def __enter__(self): + return self + + def __exit__(self, type, value, traceback): + """Releases underlying transport's resources. + + .. warning:: + ONLY use as a context manager if the transport is NOT shared + with other clients! Exiting the with block will CLOSE the transport + and may cause errors in other clients! + """ + self.transport.close() + try: DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo( diff --git a/google/cloud/tpu_v1/services/tpu/transports/base.py b/google/cloud/tpu_v1/services/tpu/transports/base.py index 2514ebe..512db45 100644 --- a/google/cloud/tpu_v1/services/tpu/transports/base.py +++ b/google/cloud/tpu_v1/services/tpu/transports/base.py @@ -197,6 +197,15 @@ def _prep_wrapped_messages(self, client_info): ), } + def close(self): + """Closes resources associated with the transport. + + .. warning:: + Only call this method if the transport is NOT shared + with other clients - this may cause errors in other clients! + """ + raise NotImplementedError() + @property def operations_client(self) -> operations_v1.OperationsClient: """Return the client designed to process long-running operations.""" diff --git a/google/cloud/tpu_v1/services/tpu/transports/grpc.py b/google/cloud/tpu_v1/services/tpu/transports/grpc.py index 6566f06..7aecd4d 100644 --- a/google/cloud/tpu_v1/services/tpu/transports/grpc.py +++ b/google/cloud/tpu_v1/services/tpu/transports/grpc.py @@ -532,5 +532,8 @@ def get_accelerator_type( ) return self._stubs["get_accelerator_type"] + def close(self): + self.grpc_channel.close() + __all__ = ("TpuGrpcTransport",) diff --git a/google/cloud/tpu_v1/services/tpu/transports/grpc_asyncio.py b/google/cloud/tpu_v1/services/tpu/transports/grpc_asyncio.py index 31edb8d..383c8ff 100644 --- a/google/cloud/tpu_v1/services/tpu/transports/grpc_asyncio.py +++ b/google/cloud/tpu_v1/services/tpu/transports/grpc_asyncio.py @@ -544,5 +544,8 @@ def get_accelerator_type( ) return self._stubs["get_accelerator_type"] + def close(self): + return self.grpc_channel.close() + __all__ = ("TpuGrpcAsyncIOTransport",) diff --git a/google/cloud/tpu_v1/types/cloud_tpu.py b/google/cloud/tpu_v1/types/cloud_tpu.py index 376d7f4..3bc1502 100644 --- a/google/cloud/tpu_v1/types/cloud_tpu.py +++ b/google/cloud/tpu_v1/types/cloud_tpu.py @@ -48,6 +48,7 @@ class SchedulingConfig(proto.Message): r"""Sets the scheduling options for this node. + Attributes: preemptible (bool): Defines whether the node is preemptible. @@ -62,6 +63,7 @@ class SchedulingConfig(proto.Message): class NetworkEndpoint(proto.Message): r"""A network endpoint over which a TPU worker can be reached. + Attributes: ip_address (str): The IP address of this network endpoint. @@ -75,6 +77,7 @@ class NetworkEndpoint(proto.Message): class Node(proto.Message): r"""A TPU instance. + Attributes: name (str): Output only. Immutable. The name of the TPU @@ -225,6 +228,7 @@ class ApiVersion(proto.Enum): class ListNodesRequest(proto.Message): r"""Request for [ListNodes][google.cloud.tpu.v1.Tpu.ListNodes]. + Attributes: parent (str): Required. The parent resource name. @@ -242,6 +246,7 @@ class ListNodesRequest(proto.Message): class ListNodesResponse(proto.Message): r"""Response for [ListNodes][google.cloud.tpu.v1.Tpu.ListNodes]. + Attributes: nodes (Sequence[google.cloud.tpu_v1.types.Node]): The listed nodes. @@ -262,6 +267,7 @@ def raw_page(self): class GetNodeRequest(proto.Message): r"""Request for [GetNode][google.cloud.tpu.v1.Tpu.GetNode]. + Attributes: name (str): Required. The resource name. @@ -272,6 +278,7 @@ class GetNodeRequest(proto.Message): class CreateNodeRequest(proto.Message): r"""Request for [CreateNode][google.cloud.tpu.v1.Tpu.CreateNode]. + Attributes: parent (str): Required. The parent resource name. @@ -288,6 +295,7 @@ class CreateNodeRequest(proto.Message): class DeleteNodeRequest(proto.Message): r"""Request for [DeleteNode][google.cloud.tpu.v1.Tpu.DeleteNode]. + Attributes: name (str): Required. The resource name. @@ -298,6 +306,7 @@ class DeleteNodeRequest(proto.Message): class ReimageNodeRequest(proto.Message): r"""Request for [ReimageNode][google.cloud.tpu.v1.Tpu.ReimageNode]. + Attributes: name (str): The resource name. @@ -311,6 +320,7 @@ class ReimageNodeRequest(proto.Message): class StopNodeRequest(proto.Message): r"""Request for [StopNode][google.cloud.tpu.v1.Tpu.StopNode]. + Attributes: name (str): The resource name. @@ -321,6 +331,7 @@ class StopNodeRequest(proto.Message): class StartNodeRequest(proto.Message): r"""Request for [StartNode][google.cloud.tpu.v1.Tpu.StartNode]. + Attributes: name (str): The resource name. @@ -331,6 +342,7 @@ class StartNodeRequest(proto.Message): class TensorFlowVersion(proto.Message): r"""A tensorflow version that a Node can be configured with. + Attributes: name (str): The resource name. @@ -405,6 +417,7 @@ def raw_page(self): class AcceleratorType(proto.Message): r"""A accelerator type that a Node can be configured with. + Attributes: name (str): The resource name. @@ -479,6 +492,7 @@ def raw_page(self): class OperationMetadata(proto.Message): r"""Metadata describing an [Operation][google.longrunning.Operation] + Attributes: create_time (google.protobuf.timestamp_pb2.Timestamp): The time the operation was created. @@ -510,6 +524,7 @@ class OperationMetadata(proto.Message): class Symptom(proto.Message): r"""A Symptom instance. + Attributes: create_time (google.protobuf.timestamp_pb2.Timestamp): Timestamp when the Symptom is created. diff --git a/tests/unit/gapic/tpu_v1/test_tpu.py b/tests/unit/gapic/tpu_v1/test_tpu.py index fc88067..15e542a 100644 --- a/tests/unit/gapic/tpu_v1/test_tpu.py +++ b/tests/unit/gapic/tpu_v1/test_tpu.py @@ -32,6 +32,7 @@ from google.api_core import grpc_helpers_async from google.api_core import operation_async # type: ignore from google.api_core import operations_v1 +from google.api_core import path_template from google.auth import credentials as ga_credentials from google.auth.exceptions import MutualTLSChannelError from google.cloud.tpu_v1.services.tpu import TpuAsyncClient @@ -3100,6 +3101,9 @@ def test_tpu_base_transport(): with pytest.raises(NotImplementedError): getattr(transport, method)(request=object()) + with pytest.raises(NotImplementedError): + transport.close() + # Additionally, the LRO client (a property) should # also raise NotImplementedError with pytest.raises(NotImplementedError): @@ -3633,3 +3637,49 @@ def test_client_withDEFAULT_CLIENT_INFO(): credentials=ga_credentials.AnonymousCredentials(), client_info=client_info, ) prep.assert_called_once_with(client_info) + + +@pytest.mark.asyncio +async def test_transport_close_async(): + client = TpuAsyncClient( + credentials=ga_credentials.AnonymousCredentials(), transport="grpc_asyncio", + ) + with mock.patch.object( + type(getattr(client.transport, "grpc_channel")), "close" + ) as close: + async with client: + close.assert_not_called() + close.assert_called_once() + + +def test_transport_close(): + transports = { + "grpc": "_grpc_channel", + } + + for transport, close_name in transports.items(): + client = TpuClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport + ) + with mock.patch.object( + type(getattr(client.transport, close_name)), "close" + ) as close: + with client: + close.assert_not_called() + close.assert_called_once() + + +def test_client_ctx(): + transports = [ + "grpc", + ] + for transport in transports: + client = TpuClient( + credentials=ga_credentials.AnonymousCredentials(), transport=transport + ) + # Test client calls underlying transport. + with mock.patch.object(type(client.transport), "close") as close: + close.assert_not_called() + with client: + pass + close.assert_called()