From 7cf02aee1f36aa3657bab80bdd702dd4dc53a34e Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Thu, 5 Nov 2020 10:15:28 -0500 Subject: [PATCH] fix: Make public API more similar to generated clients (#56) * fix: Make public API more similar to generated clients * fix: Assorted fixes * fix: Export clients in google.cloud.pubsublite.cloudpubsub * fix: Add tests, break out repeated code, and restructure admin client * fix: Import admin types to __init__.py and add overrides on async clients to copy docs * fix: Lint issues * fix: Fix deadlock in streaming pull shutdown * fix: Ensure client awaitables passed to await_unless_failed are awaited --- .coveragerc | 4 + google/cloud/pubsublite/__init__.py | 6 + google/cloud/pubsublite/admin_client.py | 108 +++++++---- .../pubsublite/admin_client_interface.py | 71 ++++++++ .../cloud/pubsublite/cloudpubsub/__init__.py | 13 ++ .../internal/assigning_subscriber.py | 12 +- .../internal/async_publisher_impl.py | 6 +- .../internal/client_multiplexer.py | 112 ++++++++++++ .../{ => internal}/make_publisher.py | 45 +++-- .../{ => internal}/make_subscriber.py | 90 ++-------- .../multiplexed_async_publisher_client.py | 61 +++++++ .../multiplexed_async_subscriber_client.py | 92 ++++++++++ .../internal/multiplexed_publisher_client.py | 63 +++++++ .../internal/multiplexed_subscriber_client.py | 81 +++++++++ .../cloudpubsub/internal/publisher_impl.py | 11 +- .../internal/single_partition_subscriber.py | 6 +- .../single_publisher.py} | 4 +- .../single_subscriber.py} | 15 +- .../cloudpubsub/internal/subscriber_impl.py | 28 +-- .../cloudpubsub/publisher_client.py | 157 ++++++++++++++++ .../cloudpubsub/publisher_client_interface.py | 71 ++++++++ .../cloudpubsub/subscriber_client.py | 169 ++++++++++++++++++ .../subscriber_client_interface.py | 91 ++++++++++ .../constructable_from_service_account.py | 18 ++ .../internal/wire/admin_client_impl.py | 4 +- .../internal/wire/make_publisher.py | 8 +- .../internal/wire/permanent_failable.py | 14 +- .../internal/wire/routing_publisher.py | 7 +- google/cloud/pubsublite/make_admin_client.py | 23 --- setup.py | 1 + .../internal/assigning_subscriber_test.py | 24 +-- .../internal/async_client_multiplexer_test.py | 69 +++++++ .../internal/client_multiplexer_test.py | 67 +++++++ ...ultiplexed_async_subscriber_client_test.py | 76 ++++++++ .../internal/publisher_impl_test.py | 15 +- .../single_partition_subscriber_test.py | 24 ++- .../internal/subscriber_impl_test.py | 8 +- 37 files changed, 1463 insertions(+), 211 deletions(-) create mode 100644 google/cloud/pubsublite/admin_client_interface.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py rename google/cloud/pubsublite/cloudpubsub/{ => internal}/make_publisher.py (74%) rename google/cloud/pubsublite/cloudpubsub/{ => internal}/make_subscriber.py (68%) create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py rename google/cloud/pubsublite/cloudpubsub/{publisher.py => internal/single_publisher.py} (94%) rename google/cloud/pubsublite/cloudpubsub/{subscriber.py => internal/single_subscriber.py} (66%) create mode 100644 google/cloud/pubsublite/cloudpubsub/publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py create mode 100644 google/cloud/pubsublite/cloudpubsub/subscriber_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py create mode 100644 google/cloud/pubsublite/internal/constructable_from_service_account.py delete mode 100644 google/cloud/pubsublite/make_admin_client.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py diff --git a/.coveragerc b/.coveragerc index 68a287da..08d01e36 100644 --- a/.coveragerc +++ b/.coveragerc @@ -29,6 +29,10 @@ exclude_lines = # Ignore abstract methods raise NotImplementedError @abstractmethod + # Ignore delegating methods + self._impl. + # Ignore __exit__ "return self" + return self omit = */pubsublite_v1/*.py diff --git a/google/cloud/pubsublite/__init__.py b/google/cloud/pubsublite/__init__.py index 80bc419e..0a2808c0 100644 --- a/google/cloud/pubsublite/__init__.py +++ b/google/cloud/pubsublite/__init__.py @@ -96,8 +96,14 @@ from google.cloud.pubsublite_v1.types.subscriber import SeekResponse from google.cloud.pubsublite_v1.types.subscriber import SubscribeRequest from google.cloud.pubsublite_v1.types.subscriber import SubscribeResponse +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface +from google.cloud.pubsublite.admin_client import AdminClient __all__ = ( + # Manual files + "AdminClient", + "AdminClientInterface", + # Generated files "AdminServiceAsyncClient", "AdminServiceClient", "AttributeValues", diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index d1882ef8..eac81675 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -1,67 +1,109 @@ -from abc import ABC, abstractmethod -from typing import List +from typing import Optional, List +from overrides import overrides +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.protobuf.field_mask_pb2 import FieldMask + +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) +from google.cloud.pubsublite.internal.endpoints import regional_endpoint +from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl from google.cloud.pubsublite.types import ( CloudRegion, - TopicPath, - LocationPath, SubscriptionPath, + LocationPath, + TopicPath, ) -from google.cloud.pubsublite_v1 import Topic, Subscription -from google.protobuf.field_mask_pb2 import FieldMask +from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic + + +class AdminClient(AdminClientInterface, ConstructableFromServiceAccount): + """ + An admin client for Pub/Sub Lite. Only operates on a single region. + """ + + _impl: AdminClientInterface + + def __init__( + self, + region: CloudRegion, + credentials: Optional[Credentials] = None, + transport: Optional[str] = None, + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new AdminClient. + Args: + region: The cloud region to connect to. + credentials: The credentials to use when connecting. + transport: The transport to use. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + if client_options is None: + client_options = ClientOptions(api_endpoint=regional_endpoint(region)) + self._impl = AdminClientImpl( + AdminServiceClient( + client_options=client_options, + transport=transport, + credentials=credentials, + ), + region, + ) -class AdminClient(ABC): - @abstractmethod + @overrides def region(self) -> CloudRegion: - """The region this client is for.""" + return self._impl.region() - @abstractmethod + @overrides def create_topic(self, topic: Topic) -> Topic: - """Create a topic, returns the created topic.""" + return self._impl.create_topic(topic) - @abstractmethod + @overrides def get_topic(self, topic_path: TopicPath) -> Topic: - """Get the topic object from the server.""" + return self._impl.get_topic(topic_path) - @abstractmethod + @overrides def get_topic_partition_count(self, topic_path: TopicPath) -> int: - """Get the number of partitions in the provided topic.""" + return self._impl.get_topic_partition_count(topic_path) - @abstractmethod + @overrides def list_topics(self, location_path: LocationPath) -> List[Topic]: - """List the Pub/Sub lite topics that exist for a project in a given location.""" + return self._impl.list_topics(location_path) - @abstractmethod + @overrides def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: - """Update the masked fields of the provided topic.""" + return self._impl.update_topic(topic, update_mask) - @abstractmethod + @overrides def delete_topic(self, topic_path: TopicPath): - """Delete a topic and all associated messages.""" + return self._impl.delete_topic(topic_path) - @abstractmethod + @overrides def list_topic_subscriptions(self, topic_path: TopicPath): - """List the subscriptions that exist for a given topic.""" + return self._impl.list_topic_subscriptions(topic_path) - @abstractmethod + @overrides def create_subscription(self, subscription: Subscription) -> Subscription: - """Create a subscription, returns the created subscription.""" + return self._impl.create_subscription(subscription) - @abstractmethod + @overrides def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: - """Get the subscription object from the server.""" + return self._impl.get_subscription(subscription_path) - @abstractmethod + @overrides def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: - """List the Pub/Sub lite subscriptions that exist for a project in a given location.""" + return self._impl.list_subscriptions(location_path) - @abstractmethod + @overrides def update_subscription( self, subscription: Subscription, update_mask: FieldMask ) -> Subscription: - """Update the masked fields of the provided subscription.""" + return self._impl.update_subscription(subscription, update_mask) - @abstractmethod + @overrides def delete_subscription(self, subscription_path: SubscriptionPath): - """Delete a subscription and all associated messages.""" + return self._impl.delete_subscription(subscription_path) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py new file mode 100644 index 00000000..d8794768 --- /dev/null +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -0,0 +1,71 @@ +from abc import ABC, abstractmethod +from typing import List + +from google.cloud.pubsublite.types import ( + CloudRegion, + TopicPath, + LocationPath, + SubscriptionPath, +) +from google.cloud.pubsublite_v1 import Topic, Subscription +from google.protobuf.field_mask_pb2 import FieldMask + + +class AdminClientInterface(ABC): + """ + An admin client for Pub/Sub Lite. Only operates on a single region. + """ + + @abstractmethod + def region(self) -> CloudRegion: + """The region this client is for.""" + + @abstractmethod + def create_topic(self, topic: Topic) -> Topic: + """Create a topic, returns the created topic.""" + + @abstractmethod + def get_topic(self, topic_path: TopicPath) -> Topic: + """Get the topic object from the server.""" + + @abstractmethod + def get_topic_partition_count(self, topic_path: TopicPath) -> int: + """Get the number of partitions in the provided topic.""" + + @abstractmethod + def list_topics(self, location_path: LocationPath) -> List[Topic]: + """List the Pub/Sub lite topics that exist for a project in a given location.""" + + @abstractmethod + def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: + """Update the masked fields of the provided topic.""" + + @abstractmethod + def delete_topic(self, topic_path: TopicPath): + """Delete a topic and all associated messages.""" + + @abstractmethod + def list_topic_subscriptions(self, topic_path: TopicPath): + """List the subscriptions that exist for a given topic.""" + + @abstractmethod + def create_subscription(self, subscription: Subscription) -> Subscription: + """Create a subscription, returns the created subscription.""" + + @abstractmethod + def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: + """Get the subscription object from the server.""" + + @abstractmethod + def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: + """List the Pub/Sub lite subscriptions that exist for a project in a given location.""" + + @abstractmethod + def update_subscription( + self, subscription: Subscription, update_mask: FieldMask + ) -> Subscription: + """Update the masked fields of the provided subscription.""" + + @abstractmethod + def delete_subscription(self, subscription_path: SubscriptionPath): + """Delete a subscription and all associated messages.""" diff --git a/google/cloud/pubsublite/cloudpubsub/__init__.py b/google/cloud/pubsublite/cloudpubsub/__init__.py index e69de29b..c030009e 100644 --- a/google/cloud/pubsublite/cloudpubsub/__init__.py +++ b/google/cloud/pubsublite/cloudpubsub/__init__.py @@ -0,0 +1,13 @@ +# flake8: noqa +from .message_transformer import MessageTransformer +from .nack_handler import NackHandler +from .publisher_client_interface import ( + PublisherClientInterface, + AsyncPublisherClientInterface, +) +from .publisher_client import PublisherClient, AsyncPublisherClient +from .subscriber_client_interface import ( + SubscriberClientInterface, + AsyncSubscriberClientInterface, +) +from .subscriber_client import SubscriberClient, AsyncSubscriberClient diff --git a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py index 21a0cacb..4a0c410d 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py @@ -3,21 +3,23 @@ from google.cloud.pubsub_v1.subscriber.message import Message -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled from google.cloud.pubsublite.internal.wire.assigner import Assigner from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable from google.cloud.pubsublite.types import Partition -PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber] +PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber] class _RunningSubscriber(NamedTuple): - subscriber: AsyncSubscriber + subscriber: AsyncSingleSubscriber poller: Future -class AssigningSubscriber(AsyncSubscriber, PermanentFailable): +class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable): _assigner_factory: Callable[[], Assigner] _subscriber_factory: PartitionSubscriberFactory @@ -47,7 +49,7 @@ def __init__( async def read(self) -> Message: return await self.await_unless_failed(self._messages.get()) - async def _subscribe_action(self, subscriber: AsyncSubscriber): + async def _subscribe_action(self, subscriber: AsyncSingleSubscriber): message = await subscriber.read() await self._messages.put(message) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py index 832f244e..45e635dd 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py @@ -5,11 +5,13 @@ from google.cloud.pubsublite.cloudpubsub.message_transforms import ( from_cps_publish_message, ) -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, +) from google.cloud.pubsublite.internal.wire.publisher import Publisher -class AsyncPublisherImpl(AsyncPublisher): +class AsyncSinglePublisherImpl(AsyncSinglePublisher): _publisher_factory: Callable[[], Publisher] _publisher: Optional[Publisher] diff --git a/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py b/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py new file mode 100644 index 00000000..7f097f4c --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py @@ -0,0 +1,112 @@ +import asyncio +import threading +from typing import Generic, TypeVar, Callable, Dict, Awaitable + +from google.api_core.exceptions import FailedPrecondition + +_Key = TypeVar("_Key") +_Client = TypeVar("_Client") + + +class ClientMultiplexer(Generic[_Key, _Client]): + _OpenedClientFactory = Callable[[], _Client] + _ClientCloser = Callable[[_Client], None] + + _closer: _ClientCloser + _lock: threading.Lock + _live_clients: Dict[_Key, _Client] + + def __init__( + self, closer: _ClientCloser = lambda client: client.__exit__(None, None, None) + ): + self._closer = closer + self._lock = threading.Lock() + self._live_clients = {} + + def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + with self._lock: + if key not in self._live_clients: + self._live_clients[key] = factory() + return self._live_clients[key] + + def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + with self._lock: + if key in self._live_clients: + raise FailedPrecondition( + f"Cannot create two clients with the same key. {_Key}" + ) + self._live_clients[key] = factory() + return self._live_clients[key] + + def try_erase(self, key: _Key, client: _Client): + with self._lock: + if key not in self._live_clients: + return + current_client = self._live_clients[key] + if current_client is not client: + return + del self._live_clients[key] + self._closer(client) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + live_clients: Dict[_Key, _Client] + with self._lock: + live_clients = self._live_clients + self._live_clients = {} + for topic, client in live_clients.items(): + self._closer(client) + + +class AsyncClientMultiplexer(Generic[_Key, _Client]): + _OpenedClientFactory = Callable[[], Awaitable[_Client]] + _ClientCloser = Callable[[_Client], Awaitable[None]] + + _closer: _ClientCloser + _lock: asyncio.Lock + _live_clients: Dict[_Key, _Client] + + def __init__( + self, closer: _ClientCloser = lambda client: client.__aexit__(None, None, None) + ): + self._closer = closer + self._live_clients = {} + + async def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + async with self._lock: + if key not in self._live_clients: + self._live_clients[key] = await factory() + return self._live_clients[key] + + async def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + async with self._lock: + if key in self._live_clients: + raise FailedPrecondition( + f"Cannot create two clients with the same key. {_Key}" + ) + self._live_clients[key] = await factory() + return self._live_clients[key] + + async def try_erase(self, key: _Key, client: _Client): + async with self._lock: + if key not in self._live_clients: + return + current_client = self._live_clients[key] + if current_client is not client: + return + del self._live_clients[key] + await self._closer(client) + + async def __aenter__(self): + self._lock = asyncio.Lock() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + live_clients: Dict[_Key, _Client] + async with self._lock: + live_clients = self._live_clients + self._live_clients = {} + for topic, client in live_clients.items(): + await self._closer(client) diff --git a/google/cloud/pubsublite/cloudpubsub/make_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py similarity index 74% rename from google/cloud/pubsublite/cloudpubsub/make_publisher.py rename to google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py index 390afbdf..c3b1ac4d 100644 --- a/google/cloud/pubsublite/cloudpubsub/make_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py @@ -5,10 +5,15 @@ from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import ( - AsyncPublisherImpl, + AsyncSinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import ( + SinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, + SinglePublisher, ) -from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher from google.cloud.pubsublite.internal.wire.make_publisher import ( make_publisher as make_wire_publisher, DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, @@ -23,16 +28,18 @@ def make_async_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> AsyncPublisher: +) -> AsyncSinglePublisher: """ Make a new publisher for the given topic. Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -48,28 +55,31 @@ def make_async_publisher( def underlying_factory(): return make_wire_publisher( - topic, - per_partition_batching_settings, - credentials, - client_options, - metadata, + topic=topic, + transport=transport, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + metadata=metadata, ) - return AsyncPublisherImpl(underlying_factory) + return AsyncSinglePublisherImpl(underlying_factory) def make_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> Publisher: +) -> SinglePublisher: """ Make a new publisher for the given topic. Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -81,12 +91,13 @@ def make_publisher( Throws: GoogleApiCallException on any error determining topic structure. """ - return PublisherImpl( + return SinglePublisherImpl( make_async_publisher( - topic, - per_partition_batching_settings, - credentials, - client_options, - metadata, + topic=topic, + transport=transport, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + metadata=metadata, ) ) diff --git a/google/cloud/pubsublite/cloudpubsub/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py similarity index 68% rename from google/cloud/pubsublite/cloudpubsub/make_subscriber.py rename to google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index 8d729fea..4c520019 100644 --- a/google/cloud/pubsublite/cloudpubsub/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -1,22 +1,19 @@ -from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, Mapping, Set, AsyncIterator, Callable from uuid import uuid4 from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials -from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture from google.cloud.pubsublite.types import FlowControlSettings from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import ( AckSetTrackerImpl, ) from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import ( PartitionSubscriberFactory, - AssigningSubscriber, + AssigningSingleSubscriber, ) from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import ( - SinglePartitionSubscriber, + SinglePartitionSingleSubscriber, ) -import google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl as cps_subscriber from google.cloud.pubsublite.cloudpubsub.message_transformer import ( MessageTransformer, DefaultMessageTransformer, @@ -25,9 +22,8 @@ NackHandler, DefaultNackHandler, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, - MessageCallback, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, ) from google.cloud.pubsublite.internal.endpoints import regional_endpoint from google.cloud.pubsublite.internal.wire.assigner import Assigner @@ -67,12 +63,15 @@ def _make_dynamic_assigner( subscription: SubscriptionPath, - assignment_client: PartitionAssignmentServiceAsyncClient, + transport: str, + client_options: ClientOptions, + credentials: Optional[Credentials], base_metadata: Optional[Mapping[str, str]], ) -> Assigner: def assignment_connection_factory( requests: AsyncIterator[PartitionAssignmentRequest], ): + assignment_client = PartitionAssignmentServiceAsyncClient(credentials=credentials, transport=transport, client_options=client_options) # type: ignore return assignment_client.assign_partitions( requests, metadata=list(base_metadata.items()) ) @@ -87,6 +86,7 @@ def assignment_connection_factory( def _make_partition_subscriber_factory( subscription: SubscriptionPath, + transport: str, client_options: ClientOptions, credentials: Optional[Credentials], base_metadata: Optional[Mapping[str, str]], @@ -94,11 +94,11 @@ def _make_partition_subscriber_factory( nack_handler: NackHandler, message_transformer: MessageTransformer, ) -> PartitionSubscriberFactory: - def factory(partition: Partition) -> AsyncSubscriber: + def factory(partition: Partition) -> AsyncSingleSubscriber: subscribe_client = SubscriberServiceAsyncClient( - credentials=credentials, client_options=client_options + credentials=credentials, client_options=client_options, transport=transport ) # type: ignore - cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options) # type: ignore + cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options, transport=transport) # type: ignore final_metadata = merge_metadata( base_metadata, subscription_routing_metadata(subscription, partition) ) @@ -130,7 +130,7 @@ def cursor_connection_factory( GapicConnectionFactory(cursor_connection_factory), ) ack_set_tracker = AckSetTrackerImpl(committer) - return SinglePartitionSubscriber( + return SinglePartitionSingleSubscriber( subscriber, flow_control_settings, ack_set_tracker, @@ -143,6 +143,7 @@ def cursor_connection_factory( def make_async_subscriber( subscription: SubscriptionPath, + transport: str, per_partition_flow_control_settings: FlowControlSettings, nack_handler: Optional[NackHandler] = None, message_transformer: Optional[MessageTransformer] = None, @@ -150,12 +151,13 @@ def make_async_subscriber( credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> AsyncSubscriber: +) -> AsyncSingleSubscriber: """ Make a Pub/Sub Lite AsyncSubscriber. Args: subscription: The subscription to subscribe to. + transport: The transport type to use. per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these settings apply to each partition individually, not in aggregate. nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. @@ -178,11 +180,7 @@ def make_async_subscriber( assigner_factory = lambda: FixedSetAssigner(fixed_partitions) # noqa: E731 else: assigner_factory = lambda: _make_dynamic_assigner( # noqa: E731 - subscription, - PartitionAssignmentServiceAsyncClient( - credentials=credentials, client_options=client_options - ), - metadata, + subscription, transport, client_options, credentials, metadata, ) if nack_handler is None: @@ -191,6 +189,7 @@ def make_async_subscriber( message_transformer = DefaultMessageTransformer() partition_subscriber_factory = _make_partition_subscriber_factory( subscription, + transport, client_options, credentials, metadata, @@ -198,55 +197,4 @@ def make_async_subscriber( nack_handler, message_transformer, ) - return AssigningSubscriber(assigner_factory, partition_subscriber_factory) - - -def make_subscriber( - subscription: SubscriptionPath, - per_partition_flow_control_settings: FlowControlSettings, - callback: MessageCallback, - nack_handler: Optional[NackHandler] = None, - message_transformer: Optional[MessageTransformer] = None, - fixed_partitions: Optional[Set[Partition]] = None, - executor: Optional[ThreadPoolExecutor] = None, - credentials: Optional[Credentials] = None, - client_options: Optional[ClientOptions] = None, - metadata: Optional[Mapping[str, str]] = None, -) -> StreamingPullFuture: - """ - Make a Pub/Sub Lite Subscriber. - - Args: - subscription: The subscription to subscribe to. - per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these - settings apply to each partition individually, not in aggregate. - callback: The callback to call with each message. - nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. - message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. - fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. - executor: The executor to use for user callbacks. If not provided, will use the default constructed - ThreadPoolExecutor. If provided a single threaded executor, messages will be ordered per-partition, but take care - that the callback does not block for too long as it will impede forward progress on all partitions. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. - - Returns: - A StreamingPullFuture, managing the subscriber's lifetime. - """ - underlying = make_async_subscriber( - subscription, - per_partition_flow_control_settings, - nack_handler, - message_transformer, - fixed_partitions, - credentials, - client_options, - metadata, - ) - if executor is None: - executor = ThreadPoolExecutor() - subscriber = cps_subscriber.SubscriberImpl(underlying, callback, executor) - future = StreamingPullFuture(subscriber) - subscriber.__enter__() - return future + return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py new file mode 100644 index 00000000..2755d6d9 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py @@ -0,0 +1,61 @@ +from typing import Callable, Union, Mapping + +from google.api_core.exceptions import GoogleAPICallError + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + AsyncPublisherClientInterface, +) +from google.cloud.pubsublite.types import TopicPath +from overrides import overrides + + +AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher] + + +class MultiplexedAsyncPublisherClient(AsyncPublisherClientInterface): + _publisher_factory: AsyncPublisherFactory + _multiplexer: AsyncClientMultiplexer[TopicPath, AsyncSinglePublisher] + + def __init__(self, publisher_factory: AsyncPublisherFactory): + self._publisher_factory = publisher_factory + self._multiplexer = AsyncClientMultiplexer() + + @overrides + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> str: + if isinstance(topic, str): + topic = TopicPath.parse(topic) + + async def create_and_open(): + client = self._publisher_factory(topic) + await client.__aenter__() + return client + + publisher = await self._multiplexer.get_or_create(topic, create_and_open) + try: + return await publisher.publish( + data=data, ordering_key=ordering_key, **attrs + ) + except GoogleAPICallError as e: + await self._multiplexer.try_erase(topic, publisher) + raise e + + @overrides + async def __aenter__(self): + await self._multiplexer.__aenter__() + return self + + @overrides + async def __aexit__(self, exc_type, exc_value, traceback): + await self._multiplexer.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py new file mode 100644 index 00000000..ce00b8d0 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py @@ -0,0 +1,92 @@ +from typing import ( + Union, + AsyncIterator, + Awaitable, + Callable, + Optional, + Set, +) + +from google.cloud.pubsub_v1.subscriber.message import Message + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + AsyncSubscriberClientInterface, +) +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) +from overrides import overrides + + +class _SubscriberAsyncIterator(AsyncIterator): + _subscriber: AsyncSingleSubscriber + _on_failure: Callable[[], Awaitable[None]] + + def __init__( + self, + subscriber: AsyncSingleSubscriber, + on_failure: Callable[[], Awaitable[None]], + ): + self._subscriber = subscriber + self._on_failure = on_failure + + async def __anext__(self) -> Message: + try: + return await self._subscriber.read() + except: # noqa: E722 + await self._on_failure() + raise + + def __aiter__(self): + return self + + +class MultiplexedAsyncSubscriberClient(AsyncSubscriberClientInterface): + _underlying_factory: AsyncSubscriberFactory + _multiplexer: AsyncClientMultiplexer[SubscriptionPath, AsyncSingleSubscriber] + + def __init__(self, underlying_factory: AsyncSubscriberFactory): + self._underlying_factory = underlying_factory + self._multiplexer = AsyncClientMultiplexer() + + @overrides + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> AsyncIterator[Message]: + if isinstance(subscription, str): + subscription = SubscriptionPath.parse(subscription) + + async def create_and_open(): + client = self._underlying_factory( + subscription, fixed_partitions, per_partition_flow_control_settings + ) + await client.__aenter__() + return client + + subscriber = await self._multiplexer.get_or_create( + subscription, create_and_open + ) + return _SubscriberAsyncIterator( + subscriber, lambda: self._multiplexer.try_erase(subscription, subscriber) + ) + + @overrides + async def __aenter__(self): + await self._multiplexer.__aenter__() + return self + + @overrides + async def __aexit__(self, exc_type, exc_value, traceback): + await self._multiplexer.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py new file mode 100644 index 00000000..d94650f1 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -0,0 +1,63 @@ +from concurrent.futures import Future +from typing import Callable, Union, Mapping + +from google.api_core.exceptions import GoogleAPICallError + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + PublisherClientInterface, +) +from google.cloud.pubsublite.types import TopicPath +from overrides import overrides + +PublisherFactory = Callable[[TopicPath], SinglePublisher] + + +class MultiplexedPublisherClient(PublisherClientInterface): + _publisher_factory: PublisherFactory + _multiplexer: ClientMultiplexer[TopicPath, SinglePublisher] + + def __init__(self, publisher_factory: PublisherFactory): + self._publisher_factory = publisher_factory + self._multiplexer = ClientMultiplexer() + + @overrides + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> "Future[str]": + if isinstance(topic, str): + topic = TopicPath.parse(topic) + publisher = self._multiplexer.get_or_create( + topic, lambda: self._publisher_factory(topic).__enter__() + ) + future = publisher.publish(data=data, ordering_key=ordering_key, **attrs) + future.add_done_callback( + lambda fut: self._on_future_completion(topic, publisher, fut) + ) + return future + + def _on_future_completion( + self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" + ): + try: + future.result() + except GoogleAPICallError: + self._multiplexer.try_erase(topic, publisher) + + @overrides + def __enter__(self): + self._multiplexer.__enter__() + return self + + @overrides + def __exit__(self, exc_type, exc_value, traceback): + self._multiplexer.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py new file mode 100644 index 00000000..2965ecbd --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -0,0 +1,81 @@ +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Union, Optional, Set + +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, +) +from google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl import SubscriberImpl +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + SubscriberClientInterface, + MessageCallback, +) +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) +from overrides import overrides + + +class MultiplexedSubscriberClient(SubscriberClientInterface): + _executor: ThreadPoolExecutor + _underlying_factory: AsyncSubscriberFactory + + _multiplexer: ClientMultiplexer[SubscriptionPath, StreamingPullFuture] + + def __init__( + self, executor: ThreadPoolExecutor, underlying_factory: AsyncSubscriberFactory + ): + self._executor = executor + self._underlying_factory = underlying_factory + + def cancel_streaming_pull_future(fut: StreamingPullFuture): + try: + fut.cancel() + fut.result() + except: # noqa: E722 + pass + + self._multiplexer = ClientMultiplexer(cancel_streaming_pull_future) + + @overrides + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> StreamingPullFuture: + if isinstance(subscription, str): + subscription = SubscriptionPath.parse(subscription) + + def create_and_open(): + underlying = self._underlying_factory( + subscription, fixed_partitions, per_partition_flow_control_settings + ) + subscriber = SubscriberImpl(underlying, callback, self._executor) + future = StreamingPullFuture(subscriber) + subscriber.__enter__() + return future + + future = self._multiplexer.create_or_fail(subscription, create_and_open) + future.add_done_callback( + lambda fut: self._multiplexer.try_erase(subscription, future) + ) + return future + + @overrides + def __enter__(self): + self._executor.__enter__() + self._multiplexer.__enter__() + return self + + @overrides + def __exit__(self, exc_type, exc_value, traceback): + self._multiplexer.__exit__(exc_type, exc_value, traceback) + self._executor.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 7fb3b68c..428141b9 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -4,14 +4,17 @@ from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ( ManagedEventLoop, ) -from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, + AsyncSinglePublisher, +) -class PublisherImpl(Publisher): +class SinglePublisherImpl(SinglePublisher): _managed_loop: ManagedEventLoop - _underlying: AsyncPublisher + _underlying: AsyncSinglePublisher - def __init__(self, underlying: AsyncPublisher): + def __init__(self, underlying: AsyncSinglePublisher): super().__init__() self._managed_loop = ManagedEventLoop() self._underlying = underlying diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py index 94561584..8b441507 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py @@ -10,7 +10,9 @@ from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable from google.cloud.pubsublite.internal.wire.subscriber import Subscriber from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage @@ -22,7 +24,7 @@ class _SizedMessage(NamedTuple): size_bytes: int -class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber): +class SinglePartitionSingleSubscriber(PermanentFailable, AsyncSingleSubscriber): _underlying: Subscriber _flow_control_settings: FlowControlSettings _ack_set_tracker: AckSetTracker diff --git a/google/cloud/pubsublite/cloudpubsub/publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py similarity index 94% rename from google/cloud/pubsublite/cloudpubsub/publisher.py rename to google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py index 7824de51..d4568cf1 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py @@ -3,7 +3,7 @@ from concurrent import futures -class AsyncPublisher(AsyncContextManager): +class AsyncSinglePublisher(AsyncContextManager): """ An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an async context. Any publish failures are permanent. @@ -31,7 +31,7 @@ async def publish( """ -class Publisher(ContextManager): +class SinglePublisher(ContextManager): """ A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py similarity index 66% rename from google/cloud/pubsublite/cloudpubsub/subscriber.py rename to google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py index 1015f70b..4ec31b2a 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py @@ -1,10 +1,16 @@ from abc import abstractmethod -from typing import AsyncContextManager, Callable +from typing import AsyncContextManager, Callable, Set, Optional from google.cloud.pubsub_v1.subscriber.message import Message +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) -class AsyncSubscriber(AsyncContextManager): + +class AsyncSingleSubscriber(AsyncContextManager): """ A Cloud Pub/Sub asynchronous subscriber. @@ -28,4 +34,7 @@ async def read(self) -> Message: raise NotImplementedError() -MessageCallback = Callable[[Message], None] +AsyncSubscriberFactory = Callable[ + [SubscriptionPath, Optional[Set[Partition]], FlowControlSettings], + AsyncSingleSubscriber, +] diff --git a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py index 70b658d1..891797df 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py @@ -10,16 +10,18 @@ StreamingPullManager, CloseCallback, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( MessageCallback, ) class SubscriberImpl(ContextManager, StreamingPullManager): - _underlying: AsyncSubscriber + _underlying: AsyncSingleSubscriber _callback: MessageCallback - _executor: ThreadPoolExecutor + _unowned_executor: ThreadPoolExecutor _event_loop: ManagedEventLoop @@ -31,13 +33,13 @@ class SubscriberImpl(ContextManager, StreamingPullManager): def __init__( self, - underlying: AsyncSubscriber, + underlying: AsyncSingleSubscriber, callback: MessageCallback, - executor: ThreadPoolExecutor, + unowned_executor: ThreadPoolExecutor, ): self._underlying = underlying self._callback = callback - self._executor = executor + self._unowned_executor = unowned_executor self._event_loop = ManagedEventLoop() self._close_lock = threading.Lock() self._failure = None @@ -56,9 +58,10 @@ def add_close_callback(self, close_callback: CloseCallback): def close(self): with self._close_lock: - if not self._closed: - self._closed = True - self.__exit__(None, None, None) + if self._closed: + return + self._closed = True + self.__exit__(None, None, None) def _fail(self, error: GoogleAPICallError): self._failure = error @@ -68,9 +71,9 @@ async def _poller(self): try: while True: message = await self._underlying.read() - self._executor.submit(self._callback, message) + self._unowned_executor.submit(self._callback, message) except GoogleAPICallError as e: # noqa: F841 Flake8 thinks e is unused - self._executor.submit(lambda: self._fail(e)) # noqa: F821 + self._unowned_executor.submit(lambda: self._fail(e)) # noqa: F821 def __enter__(self): assert self._close_callback is not None @@ -90,5 +93,4 @@ def __exit__(self, exc_type, exc_value, traceback): ).result() self._event_loop.__exit__(exc_type, exc_value, traceback) assert self._close_callback is not None - self._executor.shutdown(wait=False) # __exit__ may be called from the executor. self._close_callback(self, self._failure) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py new file mode 100644 index 00000000..23c72f32 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -0,0 +1,157 @@ +from concurrent.futures import Future +from typing import Optional, Mapping, Union + +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.cloud.pubsub_v1.types import BatchSettings + +from google.cloud.pubsublite.cloudpubsub.internal.make_publisher import ( + make_publisher, + make_async_publisher, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_publisher_client import ( + MultiplexedAsyncPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import ( + MultiplexedPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + PublisherClientInterface, + AsyncPublisherClientInterface, +) +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) +from google.cloud.pubsublite.internal.wire.make_publisher import ( + DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, +) +from google.cloud.pubsublite.types import TopicPath +from overrides import overrides + + +class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount): + """ + A PublisherClient publishes messages similar to Google Pub/Sub. + Any publish failures are unlikely to succeed if retried. + + Must be used in a `with` block or have __enter__() called before use. + """ + + _impl: PublisherClientInterface + + DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + """ + The default batching settings for a publisher client. + """ + + def __init__( + self, + per_partition_batching_settings: Optional[BatchSettings] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new PublisherClient. + + Args: + per_partition_batching_settings: The settings for publish batching. Apply on a per-partition basis. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + self._impl = MultiplexedPublisherClient( + lambda topic: make_publisher( + topic=topic, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + transport=transport, + ) + ) + + @overrides + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> "Future[str]": + return self._impl.publish( + topic=topic, data=data, ordering_key=ordering_key, **attrs + ) + + @overrides + def __enter__(self): + self._impl.__enter__() + return self + + @overrides + def __exit__(self, exc_type, exc_value, traceback): + self._impl.__exit__(exc_type, exc_value, traceback) + + +class AsyncPublisherClient( + AsyncPublisherClientInterface, ConstructableFromServiceAccount +): + """ + An AsyncPublisherClient publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + _impl: AsyncPublisherClientInterface + + DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + """ + The default batching settings for a publisher client. + """ + + def __init__( + self, + per_partition_batching_settings: Optional[BatchSettings] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new AsyncPublisherClient. + + Args: + per_partition_batching_settings: The settings for publish batching. Apply on a per-partition basis. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + self._impl = MultiplexedAsyncPublisherClient( + lambda topic: make_async_publisher( + topic=topic, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + transport=transport, + ) + ) + + @overrides + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> str: + return await self._impl.publish( + topic=topic, data=data, ordering_key=ordering_key, **attrs + ) + + @overrides + async def __aenter__(self): + await self._impl.__aenter__() + return self + + @overrides + async def __aexit__(self, exc_type, exc_value, traceback): + await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py new file mode 100644 index 00000000..c024585f --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -0,0 +1,71 @@ +from abc import abstractmethod +from concurrent.futures import Future +from typing import ContextManager, Mapping, Union, AsyncContextManager + +from google.cloud.pubsublite.types import TopicPath + + +class AsyncPublisherClientInterface(AsyncContextManager): + """ + An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + @abstractmethod + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str], + ) -> str: + """ + Publish a message. + + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + An ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +class PublisherClientInterface(ContextManager): + """ + A PublisherClientInterface publishes messages similar to Google Pub/Sub. + Any publish failures are unlikely to succeed if retried. + + Must be used in a `with` block or have __enter__() called before use. + """ + + @abstractmethod + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str], + ) -> "Future[str]": + """ + Publish a message. + + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + A future completed with an ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py new file mode 100644 index 00000000..713ad7fb --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -0,0 +1,169 @@ +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Optional, Union, Set, AsyncIterator + +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture +from google.cloud.pubsub_v1.subscriber.message import Message + +from google.cloud.pubsublite.cloudpubsub.internal.make_subscriber import ( + make_async_subscriber, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_subscriber_client import ( + MultiplexedAsyncSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_subscriber_client import ( + MultiplexedSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer +from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + SubscriberClientInterface, + MessageCallback, + AsyncSubscriberClientInterface, +) +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) +from google.cloud.pubsublite.types import ( + FlowControlSettings, + Partition, + SubscriptionPath, +) +from overrides import overrides + + +class SubscriberClient(SubscriberClientInterface, ConstructableFromServiceAccount): + """ + A SubscriberClient reads messages similar to Google Pub/Sub. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in a `with` block or have __enter__() called before use. + """ + + _impl: SubscriberClientInterface + + def __init__( + self, + executor: Optional[ThreadPoolExecutor] = None, + nack_handler: Optional[NackHandler] = None, + message_transformer: Optional[MessageTransformer] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new SubscriberClient. + + Args: + executor: A ThreadPoolExecutor to use. The client will shut it down on __exit__. If provided a single threaded executor, messages will be ordered per-partition, but take care that the callback does not block for too long as it will impede forward progress on all subscriptions. + nack_handler: A handler for when `nack()` is called. The default NackHandler raises an exception and fails the subscribe stream. + message_transformer: A transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + if executor is None: + executor = ThreadPoolExecutor() + self._impl = MultiplexedSubscriberClient( + executor, + lambda subscription, partitions, settings: make_async_subscriber( + subscription=subscription, + transport=transport, + per_partition_flow_control_settings=settings, + nack_handler=nack_handler, + message_transformer=message_transformer, + fixed_partitions=partitions, + credentials=credentials, + client_options=client_options, + ), + ) + + @overrides + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> StreamingPullFuture: + return self._impl.subscribe( + subscription, + callback, + per_partition_flow_control_settings, + fixed_partitions, + ) + + @overrides + def __enter__(self): + self._impl.__enter__() + return self + + @overrides + def __exit__(self, exc_type, exc_value, traceback): + self._impl.__exit__(exc_type, exc_value, traceback) + + +class AsyncSubscriberClient( + AsyncSubscriberClientInterface, ConstructableFromServiceAccount +): + """ + An AsyncSubscriberClient reads messages similar to Google Pub/Sub, but must be used in an + async context. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + _impl: AsyncSubscriberClientInterface + + def __init__( + self, + nack_handler: Optional[NackHandler] = None, + message_transformer: Optional[MessageTransformer] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new AsyncSubscriberClient. + + Args: + nack_handler: A handler for when `nack()` is called. The default NackHandler raises an exception and fails the subscribe stream. + message_transformer: A transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + self._impl = MultiplexedAsyncSubscriberClient( + lambda subscription, partitions, settings: make_async_subscriber( + subscription=subscription, + transport=transport, + per_partition_flow_control_settings=settings, + nack_handler=nack_handler, + message_transformer=message_transformer, + fixed_partitions=partitions, + credentials=credentials, + client_options=client_options, + ) + ) + + @overrides + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> AsyncIterator[Message]: + return await self._impl.subscribe( + subscription, per_partition_flow_control_settings, fixed_partitions + ) + + @overrides + async def __aenter__(self): + await self._impl.__aenter__() + return self + + @overrides + async def __aexit__(self, exc_type, exc_value, traceback): + await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py new file mode 100644 index 00000000..90160488 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -0,0 +1,91 @@ +from abc import abstractmethod +from typing import ( + ContextManager, + Union, + AsyncContextManager, + AsyncIterator, + Callable, + Optional, + Set, +) + +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture +from google.cloud.pubsub_v1.subscriber.message import Message + +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) + + +class AsyncSubscriberClientInterface(AsyncContextManager): + """ + An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an + async context. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + @abstractmethod + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> AsyncIterator[Message]: + """ + Read messages from a subscription. + + Args: + subscription: The subscription to subscribe to. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. + + Returns: + An AsyncIterator with Messages that must have ack() called on each exactly once. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +MessageCallback = Callable[[Message], None] + + +class SubscriberClientInterface(ContextManager): + """ + A SubscriberClientInterface reads messages similar to Google Pub/Sub. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in a `with` block or have __enter__() called before use. + """ + + @abstractmethod + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, + ) -> StreamingPullFuture: + """ + This method starts a background thread to begin pulling messages from + a Pub/Sub Lite subscription and scheduling them to be processed using the + provided ``callback``. + + Args: + subscription: The subscription to subscribe to. + callback: The callback function. This function receives the message as its only argument. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. + + Returns: + A StreamingPullFuture instance that can be used to manage the background stream. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/internal/constructable_from_service_account.py b/google/cloud/pubsublite/internal/constructable_from_service_account.py new file mode 100644 index 00000000..1261c5fc --- /dev/null +++ b/google/cloud/pubsublite/internal/constructable_from_service_account.py @@ -0,0 +1,18 @@ +from google.oauth2 import service_account + + +class ConstructableFromServiceAccount: + @classmethod + def from_service_account_file(cls, filename, **kwargs): + f"""Creates an instance of this client using the provided credentials file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A {cls.__name__}. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 5fd5b56f..030a76a0 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -2,7 +2,7 @@ from google.protobuf.field_mask_pb2 import FieldMask -from google.cloud.pubsublite.admin_client import AdminClient +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface from google.cloud.pubsublite.types import ( CloudRegion, SubscriptionPath, @@ -17,7 +17,7 @@ ) -class AdminClientImpl(AdminClient): +class AdminClientImpl(AdminClientInterface): _underlying: AdminServiceClient _region: CloudRegion diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index 56e74162..93abaad3 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -2,7 +2,7 @@ from google.cloud.pubsub_v1.types import BatchSettings -from google.cloud.pubsublite.make_admin_client import make_admin_client +from google.cloud.pubsublite.admin_client import AdminClient from google.cloud.pubsublite.internal.endpoints import regional_endpoint from google.cloud.pubsublite.internal.wire.default_routing_policy import ( DefaultRoutingPolicy, @@ -35,6 +35,7 @@ def make_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, @@ -45,6 +46,7 @@ def make_publisher( Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -58,7 +60,7 @@ def make_publisher( """ if per_partition_batching_settings is None: per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS - admin_client = make_admin_client( + admin_client = AdminClient( region=topic.location.region, credentials=credentials, client_options=client_options, @@ -68,7 +70,7 @@ def make_publisher( api_endpoint=regional_endpoint(topic.location.region) ) client = async_client.PublisherServiceAsyncClient( - credentials=credentials, client_options=client_options + credentials=credentials, transport=transport, client_options=client_options ) # type: ignore clients: MutableMapping[Partition, Publisher] = {} diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py index fa7fb6f2..dcaa9467 100644 --- a/google/cloud/pubsublite/internal/wire/permanent_failable.py +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -21,6 +21,14 @@ def _failure_task(self) -> asyncio.Future: self._maybe_failure_task = asyncio.Future() return self._maybe_failure_task + @staticmethod + async def _fail_client_task(task: asyncio.Future): + task.cancel() + try: + await task + except: # noqa: E722 intentionally broad except clause + pass + async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: """ Await the awaitable, unless fail() is called first. @@ -30,15 +38,17 @@ async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: Returns: The result of the awaitable Raises: The permanent error if fail() is called or the awaitable raises one. """ + + task = asyncio.ensure_future(awaitable) if self._failure_task.done(): + await self._fail_client_task(task) raise self._failure_task.exception() - task = asyncio.ensure_future(awaitable) done, _ = await asyncio.wait( [task, self._failure_task], return_when=asyncio.FIRST_COMPLETED ) if task in done: return await task - task.cancel() + await self._fail_client_task(task) raise self._failure_task.exception() async def run_poller(self, poll_action: Callable[[], Awaitable[None]]): diff --git a/google/cloud/pubsublite/internal/wire/routing_publisher.py b/google/cloud/pubsublite/internal/wire/routing_publisher.py index 7e60d86c..2558df9c 100644 --- a/google/cloud/pubsublite/internal/wire/routing_publisher.py +++ b/google/cloud/pubsublite/internal/wire/routing_publisher.py @@ -1,3 +1,4 @@ +import asyncio from typing import Mapping from google.cloud.pubsublite.internal.wire.publisher import Publisher @@ -17,8 +18,10 @@ def __init__( self._publishers = publishers async def __aenter__(self): - for publisher in self._publishers.values(): - await publisher.__aenter__() + enter_futures = [ + publisher.__aenter__() for publisher in self._publishers.values() + ] + await asyncio.gather(*enter_futures) return self async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/google/cloud/pubsublite/make_admin_client.py b/google/cloud/pubsublite/make_admin_client.py deleted file mode 100644 index 5e4779e4..00000000 --- a/google/cloud/pubsublite/make_admin_client.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Optional - -from google.api_core.client_options import ClientOptions - -from google.cloud.pubsublite.admin_client import AdminClient -from google.cloud.pubsublite.internal.endpoints import regional_endpoint -from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl -from google.cloud.pubsublite.types import CloudRegion -from google.cloud.pubsublite_v1 import AdminServiceClient -from google.auth.credentials import Credentials - - -def make_admin_client( - region: CloudRegion, - credentials: Optional[Credentials] = None, - client_options: Optional[ClientOptions] = None, -) -> AdminClient: - if client_options is None: - client_options = ClientOptions(api_endpoint=regional_endpoint(region)) - return AdminClientImpl( - AdminServiceClient(client_options=client_options, credentials=credentials), - region, - ) diff --git a/setup.py b/setup.py index 9261c547..27d2911b 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ "google-cloud-pubsub >= 2.1.0", "grpcio", "setuptools", + "overrides", ] setuptools.setup( diff --git a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py index a0185fe2..85c7db14 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py @@ -8,10 +8,12 @@ from google.pubsub_v1 import PubsubMessage from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import ( - AssigningSubscriber, + AssigningSingleSubscriber, PartitionSubscriberFactory, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.assigner import Assigner from google.cloud.pubsublite.types import Partition from google.cloud.pubsublite.testing.test_utils import wire_queues, Box @@ -40,7 +42,7 @@ def subscriber(assigner, subscriber_factory): box = Box() def set_box(): - box.val = AssigningSubscriber(lambda: assigner, subscriber_factory) + box.val = AssigningSingleSubscriber(lambda: assigner, subscriber_factory) # Initialize AssigningSubscriber on another thread with a different event loop. thread = threading.Thread(target=set_box) @@ -62,8 +64,8 @@ async def test_initial_assignment(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) subscriber_factory.side_effect = ( lambda partition: sub1 if partition == Partition(1) else sub2 ) @@ -91,9 +93,9 @@ async def test_assignment_change(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub3 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub3 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) subscriber_factory.side_effect = ( lambda partition: sub1 if partition == Partition(1) @@ -124,7 +126,7 @@ async def test_subscriber_failure(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) sub1_queues = wire_queues(sub1.read) subscriber_factory.return_value = sub1 await assign_queues.results.put({Partition(1)}) @@ -138,8 +140,8 @@ async def test_delivery_from_multiple(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) sub1_queues = wire_queues(sub1.read) sub2_queues = wire_queues(sub2.read) subscriber_factory.side_effect = ( diff --git a/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py b/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py new file mode 100644 index 00000000..feefea1e --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py @@ -0,0 +1,69 @@ +import pytest + +from asynctest.mock import call, CoroutineMock + +from google.api_core.exceptions import FailedPrecondition + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +class Client: + pass + + +@pytest.fixture() +def client_factory(): + return CoroutineMock() + + +@pytest.fixture() +def client_closer(): + return CoroutineMock() + + +@pytest.fixture() +def multiplexer(client_closer): + return AsyncClientMultiplexer(client_closer) + + +async def test_create( + multiplexer: AsyncClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + async with multiplexer: + client_factory.return_value = client1 + assert await multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + assert await multiplexer.get_or_create(1, client_factory) is client1 + with pytest.raises(FailedPrecondition): + await multiplexer.create_or_fail(1, client_factory) + assert await multiplexer.get_or_create(2, client_factory) is client2 + client_factory.assert_has_calls([call(), call()]) + with pytest.raises(FailedPrecondition): + await multiplexer.create_or_fail(2, client_factory) + client_closer.assert_has_calls([call(client1), call(client2)], any_order=True) + + +async def test_recreate( + multiplexer: AsyncClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + async with multiplexer: + client_factory.return_value = client1 + assert await multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + await multiplexer.try_erase(1, client2) + client_closer.assert_has_calls([]) + await multiplexer.try_erase(1, client1) + client_closer.assert_has_calls([call(client1)]) + assert await multiplexer.create_or_fail(1, client_factory) is client2 + client_closer.assert_has_calls([call(client1), call(client2)]) diff --git a/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py b/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py new file mode 100644 index 00000000..21e26aa1 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py @@ -0,0 +1,67 @@ +import pytest + +from mock import MagicMock, call + +# All test coroutines will be treated as marked. +from google.api_core.exceptions import FailedPrecondition + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) + + +class Client: + pass + + +@pytest.fixture() +def client_factory(): + return MagicMock() + + +@pytest.fixture() +def client_closer(): + return MagicMock() + + +@pytest.fixture() +def multiplexer(client_closer): + return ClientMultiplexer(client_closer) + + +def test_create( + multiplexer: ClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + with multiplexer: + client_factory.return_value = client1 + assert multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + assert multiplexer.get_or_create(1, client_factory) is client1 + with pytest.raises(FailedPrecondition): + multiplexer.create_or_fail(1, client_factory) + assert multiplexer.get_or_create(2, client_factory) is client2 + client_factory.assert_has_calls([call(), call()]) + with pytest.raises(FailedPrecondition): + multiplexer.create_or_fail(2, client_factory) + client_closer.assert_has_calls([call(client1), call(client2)], any_order=True) + + +def test_recreate( + multiplexer: ClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + with multiplexer: + client_factory.return_value = client1 + assert multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + multiplexer.try_erase(1, client2) + client_closer.assert_has_calls([]) + multiplexer.try_erase(1, client1) + client_closer.assert_has_calls([call(client1)]) + assert multiplexer.create_or_fail(1, client_factory) is client2 + client_closer.assert_has_calls([call(client1), call(client2)]) diff --git a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py new file mode 100644 index 00000000..4a0ca557 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py @@ -0,0 +1,76 @@ +import asyncio + +import pytest + +from asynctest.mock import call, MagicMock + +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsub_v1.subscriber.message import Message +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClientInterface + +# All test coroutines will be treated as marked. +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_subscriber_client import ( + MultiplexedAsyncSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.testing.test_utils import wire_queues +from google.cloud.pubsublite.types import ( + SubscriptionPath, + CloudZone, + DISABLED_FLOW_CONTROL, +) + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def default_subscriber(): + return MagicMock(spec=AsyncSingleSubscriber) + + +@pytest.fixture() +def subscriber_factory(default_subscriber): + factory = MagicMock(spec=AsyncSubscriberFactory) + factory.return_value = default_subscriber + return factory + + +@pytest.fixture() +def multiplexed_client(subscriber_factory: AsyncSubscriberFactory): + return MultiplexedAsyncSubscriberClient(subscriber_factory) + + +async def test_iterator( + default_subscriber, + subscriber_factory, + multiplexed_client: AsyncSubscriberClientInterface, +): + read_queues = wire_queues(default_subscriber.read) + subscription = SubscriptionPath(1, CloudZone.parse("us-central1-a"), "abc") + message = Message(PubsubMessage(message_id="1")._pb, "", 0, None) + async with multiplexed_client: + iterator = await multiplexed_client.subscribe( + subscription, DISABLED_FLOW_CONTROL + ) + subscriber_factory.assert_has_calls( + [call(subscription, None, DISABLED_FLOW_CONTROL)] + ) + read_fut_1 = asyncio.ensure_future(iterator.__anext__()) + assert not read_fut_1.done() + await read_queues.called.get() + default_subscriber.read.assert_has_calls([call()]) + await read_queues.results.put(message) + assert await read_fut_1 is message + read_fut_2 = asyncio.ensure_future(iterator.__anext__()) + assert not read_fut_2.done() + await read_queues.called.get() + default_subscriber.read.assert_has_calls([call(), call()]) + await read_queues.results.put(FailedPrecondition("")) + with pytest.raises(FailedPrecondition): + await read_fut_2 + default_subscriber.__aexit__.assert_called_once() diff --git a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py index 530dd550..67b3e8e8 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py @@ -1,23 +1,28 @@ from asynctest.mock import MagicMock import pytest -from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import ( + SinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, + SinglePublisher, +) @pytest.fixture() def async_publisher(): - publisher = MagicMock(spec=AsyncPublisher) + publisher = MagicMock(spec=AsyncSinglePublisher) publisher.__aenter__.return_value = publisher return publisher @pytest.fixture() def publisher(async_publisher): - return PublisherImpl(async_publisher) + return SinglePublisherImpl(async_publisher) -def test_proxies_to_async(async_publisher, publisher: Publisher): +def test_proxies_to_async(async_publisher, publisher: SinglePublisher): with publisher: async_publisher.__aenter__.assert_called_once() publisher.publish(data=b"abc", ordering_key="zyx", xyz="xyz").result() diff --git a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py index b7a77e18..0a136e8d 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py @@ -10,11 +10,13 @@ from google.cloud.pubsublite.types import FlowControlSettings from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import ( - SinglePartitionSubscriber, + SinglePartitionSingleSubscriber, ) from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.subscriber import Subscriber from google.cloud.pubsublite.testing.test_utils import make_queue_waiter from google.cloud.pubsublite_v1 import Cursor, FlowControlRequest, SequencedMessage @@ -69,7 +71,7 @@ def transformer(): def subscriber( underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer ): - return SinglePartitionSubscriber( + return SinglePartitionSingleSubscriber( underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer ) @@ -92,7 +94,7 @@ async def test_failed_transform(subscriber, underlying, transformer): async def test_ack( - subscriber: AsyncSubscriber, underlying, transformer, ack_set_tracker + subscriber: AsyncSingleSubscriber, underlying, transformer, ack_set_tracker ): ack_called_queue = asyncio.Queue() ack_result_queue = asyncio.Queue() @@ -121,7 +123,10 @@ async def test_ack( async def test_track_failure( - subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker + subscriber: SinglePartitionSingleSubscriber, + underlying, + transformer, + ack_set_tracker, ): async with subscriber: ack_set_tracker.track.side_effect = FailedPrecondition("Bad track") @@ -133,7 +138,10 @@ async def test_track_failure( async def test_ack_failure( - subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker + subscriber: SinglePartitionSingleSubscriber, + underlying, + transformer, + ack_set_tracker, ): ack_called_queue = asyncio.Queue() ack_result_queue = asyncio.Queue() @@ -159,7 +167,7 @@ async def sleep_forever(): async def test_nack_failure( - subscriber: SinglePartitionSubscriber, + subscriber: SinglePartitionSingleSubscriber, underlying, transformer, ack_set_tracker, @@ -182,7 +190,7 @@ async def sleep_forever(): async def test_nack_calls_ack( - subscriber: SinglePartitionSubscriber, + subscriber: SinglePartitionSingleSubscriber, underlying, transformer, ack_set_tracker, diff --git a/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py index dc887ece..aaf35664 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py @@ -13,8 +13,10 @@ CloseCallback, ) from google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl import SubscriberImpl -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( MessageCallback, ) from google.cloud.pubsublite.testing.test_utils import Box @@ -22,7 +24,7 @@ @pytest.fixture() def async_subscriber(): - subscriber = MagicMock(spec=AsyncSubscriber) + subscriber = MagicMock(spec=AsyncSingleSubscriber) subscriber.__aenter__.return_value = subscriber return subscriber