From 74affdcbfe3b75bdcf6e1fd7e4d64dfb9694a7f0 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Tue, 27 Oct 2020 14:13:38 -0400 Subject: [PATCH 1/8] fix: Make public API more similar to generated clients --- .../internal/assigning_subscriber.py | 12 +- .../internal/async_publisher_impl.py | 6 +- .../{ => internal}/make_publisher.py | 23 ++- .../{ => internal}/make_subscriber.py | 43 +++-- .../multiplexed_async_publisher_client.py | 65 ++++++++ .../multiplexed_async_subscriber_client.py | 91 +++++++++++ .../internal/multiplexed_publisher_client.py | 74 +++++++++ .../internal/multiplexed_subscriber_client.py | 85 ++++++++++ .../cloudpubsub/internal/publisher_impl.py | 11 +- .../internal/single_partition_subscriber.py | 6 +- .../single_publisher.py} | 4 +- .../single_subscriber.py} | 8 +- .../cloudpubsub/internal/subscriber_impl.py | 21 +-- .../cloudpubsub/publisher_client.py | 140 ++++++++++++++++ .../cloudpubsub/publisher_client_interface.py | 70 ++++++++ .../cloudpubsub/subscriber_client.py | 151 ++++++++++++++++++ .../subscriber_client_interface.py | 73 +++++++++ .../internal/wire/make_publisher.py | 5 +- google/cloud/pubsublite/make_admin_client.py | 5 +- .../internal/assigning_subscriber_test.py | 24 +-- .../internal/publisher_impl_test.py | 15 +- .../single_partition_subscriber_test.py | 24 ++- .../internal/subscriber_impl_test.py | 8 +- 23 files changed, 884 insertions(+), 80 deletions(-) rename google/cloud/pubsublite/cloudpubsub/{ => internal}/make_publisher.py (86%) rename google/cloud/pubsublite/cloudpubsub/{ => internal}/make_subscriber.py (89%) create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py rename google/cloud/pubsublite/cloudpubsub/{publisher.py => internal/single_publisher.py} (94%) rename google/cloud/pubsublite/cloudpubsub/{subscriber.py => internal/single_subscriber.py} (76%) create mode 100644 google/cloud/pubsublite/cloudpubsub/publisher_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py create mode 100644 google/cloud/pubsublite/cloudpubsub/subscriber_client.py create mode 100644 google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py diff --git a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py index 21a0cacb..4a0c410d 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/assigning_subscriber.py @@ -3,21 +3,23 @@ from google.cloud.pubsub_v1.subscriber.message import Message -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled from google.cloud.pubsublite.internal.wire.assigner import Assigner from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable from google.cloud.pubsublite.types import Partition -PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber] +PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber] class _RunningSubscriber(NamedTuple): - subscriber: AsyncSubscriber + subscriber: AsyncSingleSubscriber poller: Future -class AssigningSubscriber(AsyncSubscriber, PermanentFailable): +class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable): _assigner_factory: Callable[[], Assigner] _subscriber_factory: PartitionSubscriberFactory @@ -47,7 +49,7 @@ def __init__( async def read(self) -> Message: return await self.await_unless_failed(self._messages.get()) - async def _subscribe_action(self, subscriber: AsyncSubscriber): + async def _subscribe_action(self, subscriber: AsyncSingleSubscriber): message = await subscriber.read() await self._messages.put(message) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py index 832f244e..45e635dd 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/async_publisher_impl.py @@ -5,11 +5,13 @@ from google.cloud.pubsublite.cloudpubsub.message_transforms import ( from_cps_publish_message, ) -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, +) from google.cloud.pubsublite.internal.wire.publisher import Publisher -class AsyncPublisherImpl(AsyncPublisher): +class AsyncSinglePublisherImpl(AsyncSinglePublisher): _publisher_factory: Callable[[], Publisher] _publisher: Optional[Publisher] diff --git a/google/cloud/pubsublite/cloudpubsub/make_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py similarity index 86% rename from google/cloud/pubsublite/cloudpubsub/make_publisher.py rename to google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py index 390afbdf..6d82684d 100644 --- a/google/cloud/pubsublite/cloudpubsub/make_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py @@ -5,10 +5,15 @@ from google.cloud.pubsub_v1.types import BatchSettings from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import ( - AsyncPublisherImpl, + AsyncSinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import ( + SinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, + SinglePublisher, ) -from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher from google.cloud.pubsublite.internal.wire.make_publisher import ( make_publisher as make_wire_publisher, DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, @@ -23,16 +28,18 @@ def make_async_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> AsyncPublisher: +) -> AsyncSinglePublisher: """ Make a new publisher for the given topic. Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -55,21 +62,23 @@ def underlying_factory(): metadata, ) - return AsyncPublisherImpl(underlying_factory) + return AsyncSinglePublisherImpl(underlying_factory) def make_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> Publisher: +) -> SinglePublisher: """ Make a new publisher for the given topic. Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -81,7 +90,7 @@ def make_publisher( Throws: GoogleApiCallException on any error determining topic structure. """ - return PublisherImpl( + return SinglePublisherImpl( make_async_publisher( topic, per_partition_batching_settings, diff --git a/google/cloud/pubsublite/cloudpubsub/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py similarity index 89% rename from google/cloud/pubsublite/cloudpubsub/make_subscriber.py rename to google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index 8d729fea..3229f52f 100644 --- a/google/cloud/pubsublite/cloudpubsub/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -5,16 +5,20 @@ from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture + +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + MessageCallback, +) from google.cloud.pubsublite.types import FlowControlSettings from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import ( AckSetTrackerImpl, ) from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import ( PartitionSubscriberFactory, - AssigningSubscriber, + AssigningSingleSubscriber, ) from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import ( - SinglePartitionSubscriber, + SinglePartitionSingleSubscriber, ) import google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl as cps_subscriber from google.cloud.pubsublite.cloudpubsub.message_transformer import ( @@ -25,9 +29,8 @@ NackHandler, DefaultNackHandler, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, - MessageCallback, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, ) from google.cloud.pubsublite.internal.endpoints import regional_endpoint from google.cloud.pubsublite.internal.wire.assigner import Assigner @@ -67,12 +70,15 @@ def _make_dynamic_assigner( subscription: SubscriptionPath, - assignment_client: PartitionAssignmentServiceAsyncClient, + transport: str, + client_options: ClientOptions, + credentials: Optional[Credentials], base_metadata: Optional[Mapping[str, str]], ) -> Assigner: def assignment_connection_factory( requests: AsyncIterator[PartitionAssignmentRequest], ): + assignment_client = PartitionAssignmentServiceAsyncClient(credentials=credentials, transport=transport, client_options=client_options) # type: ignore return assignment_client.assign_partitions( requests, metadata=list(base_metadata.items()) ) @@ -87,6 +93,7 @@ def assignment_connection_factory( def _make_partition_subscriber_factory( subscription: SubscriptionPath, + transport: str, client_options: ClientOptions, credentials: Optional[Credentials], base_metadata: Optional[Mapping[str, str]], @@ -94,11 +101,11 @@ def _make_partition_subscriber_factory( nack_handler: NackHandler, message_transformer: MessageTransformer, ) -> PartitionSubscriberFactory: - def factory(partition: Partition) -> AsyncSubscriber: + def factory(partition: Partition) -> AsyncSingleSubscriber: subscribe_client = SubscriberServiceAsyncClient( - credentials=credentials, client_options=client_options + credentials=credentials, client_options=client_options, transport=transport ) # type: ignore - cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options) # type: ignore + cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options, transport=transport) # type: ignore final_metadata = merge_metadata( base_metadata, subscription_routing_metadata(subscription, partition) ) @@ -130,7 +137,7 @@ def cursor_connection_factory( GapicConnectionFactory(cursor_connection_factory), ) ack_set_tracker = AckSetTrackerImpl(committer) - return SinglePartitionSubscriber( + return SinglePartitionSingleSubscriber( subscriber, flow_control_settings, ack_set_tracker, @@ -143,6 +150,7 @@ def cursor_connection_factory( def make_async_subscriber( subscription: SubscriptionPath, + transport: str, per_partition_flow_control_settings: FlowControlSettings, nack_handler: Optional[NackHandler] = None, message_transformer: Optional[MessageTransformer] = None, @@ -150,12 +158,13 @@ def make_async_subscriber( credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, metadata: Optional[Mapping[str, str]] = None, -) -> AsyncSubscriber: +) -> AsyncSingleSubscriber: """ Make a Pub/Sub Lite AsyncSubscriber. Args: subscription: The subscription to subscribe to. + transport: The transport type to use. per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these settings apply to each partition individually, not in aggregate. nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. @@ -178,11 +187,7 @@ def make_async_subscriber( assigner_factory = lambda: FixedSetAssigner(fixed_partitions) # noqa: E731 else: assigner_factory = lambda: _make_dynamic_assigner( # noqa: E731 - subscription, - PartitionAssignmentServiceAsyncClient( - credentials=credentials, client_options=client_options - ), - metadata, + subscription, transport, client_options, credentials, metadata, ) if nack_handler is None: @@ -191,6 +196,7 @@ def make_async_subscriber( message_transformer = DefaultMessageTransformer() partition_subscriber_factory = _make_partition_subscriber_factory( subscription, + transport, client_options, credentials, metadata, @@ -198,11 +204,12 @@ def make_async_subscriber( nack_handler, message_transformer, ) - return AssigningSubscriber(assigner_factory, partition_subscriber_factory) + return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory) def make_subscriber( subscription: SubscriptionPath, + transport: str, per_partition_flow_control_settings: FlowControlSettings, callback: MessageCallback, nack_handler: Optional[NackHandler] = None, @@ -218,6 +225,7 @@ def make_subscriber( Args: subscription: The subscription to subscribe to. + transport: The transport type to use. per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these settings apply to each partition individually, not in aggregate. callback: The callback to call with each message. @@ -236,6 +244,7 @@ def make_subscriber( """ underlying = make_async_subscriber( subscription, + transport, per_partition_flow_control_settings, nack_handler, message_transformer, diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py new file mode 100644 index 00000000..606726f3 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py @@ -0,0 +1,65 @@ +from typing import Callable, Dict, Union, Mapping + +from google.api_core.exceptions import GoogleAPICallError + +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + AsyncPublisherClientInterface, +) +from google.cloud.pubsublite.types import TopicPath + + +AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher] + + +class MultiplexedAsyncPublisherClient(AsyncPublisherClientInterface): + _publisher_factory: AsyncPublisherFactory + _live_publishers: Dict[TopicPath, AsyncSinglePublisher] + + def __init__(self, publisher_factory: AsyncPublisherFactory): + self._publisher_factory = publisher_factory + self._live_publishers = {} + + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> str: + if isinstance(topic, str): + topic = TopicPath.parse(topic) + publisher: AsyncSinglePublisher + if topic not in self._live_publishers: + publisher = self._publisher_factory(topic) + self._live_publishers[topic] = publisher + await publisher.__aenter__() + publisher = self._live_publishers[topic] + try: + return await publisher.publish( + data=data, ordering_key=ordering_key, **attrs + ) + except GoogleAPICallError as e: + self._on_failure(topic, publisher) + raise e + + def _on_failure(self, topic: TopicPath, publisher: AsyncSinglePublisher): + if topic not in self._live_publishers: + return + current_publisher = self._live_publishers[topic] + if current_publisher is not publisher: + return + del self._live_publishers[topic] + + await publisher.__aexit__(None, None, None) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + live_publishers = self._live_publishers + self._live_publishers = {} + for topic, pub in live_publishers.items(): + await pub.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py new file mode 100644 index 00000000..52a96f9b --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py @@ -0,0 +1,91 @@ +from typing import ( + Union, + AsyncIterator, + Awaitable, + Dict, + Callable, +) + +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsub_v1.subscriber.message import Message + +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + AsyncSubscriberClientInterface, +) +from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings + + +class _SubscriberAsyncIterator(AsyncIterator): + _subscriber: AsyncSingleSubscriber + _on_failure: Callable[[], Awaitable[None]] + + def __init__( + self, + subscriber: AsyncSingleSubscriber, + on_failure: Callable[[], Awaitable[None]], + ): + self._subscriber = subscriber + self._on_failure = on_failure + + async def __anext__(self) -> Message: + try: + return await self._subscriber.read() + except: # noqa: E722 + await self._on_failure() + raise + + def __aiter__(self): + return self + + +class MultiplexedAsyncSubscriberClient(AsyncSubscriberClientInterface): + _underlying_factory: AsyncSubscriberFactory + _live_subscribers: Dict[SubscriptionPath, AsyncSingleSubscriber] + + def __init__(self, underlying_factory: AsyncSubscriberFactory): + self._underlying_factory = underlying_factory + self._live_subscribers = {} + + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + ) -> AsyncIterator[Message]: + if isinstance(subscription, str): + subscription = SubscriptionPath.parse(subscription) + if subscription in self._live_subscribers: + raise FailedPrecondition( + f"Cannot subscribe to the same subscription twice. {subscription}" + ) + subscriber = self._underlying_factory( + subscription, per_partition_flow_control_settings + ) + self._live_subscribers[subscription] = subscriber + await subscriber.__aenter__() + return _SubscriberAsyncIterator( + subscriber, lambda: self._on_subscriber_failure(subscription, subscriber) + ) + + async def _on_subscriber_failure( + self, subscription: SubscriptionPath, subscriber: AsyncSingleSubscriber + ): + if subscription not in self._live_subscribers: + return + current_subscriber = self._live_subscribers[subscription] + if current_subscriber is not subscriber: + return + del self._live_subscribers[subscription] + await subscriber.__aexit__(None, None, None) + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + live_subscribers = self._live_subscribers + self._live_subscribers = {} + for topic, sub in live_subscribers.items(): + await sub.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py new file mode 100644 index 00000000..fe571219 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -0,0 +1,74 @@ +from concurrent.futures import Future +from threading import Lock +from typing import Callable, Dict, Union, Mapping + +from google.api_core.exceptions import GoogleAPICallError + +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + PublisherClientInterface, +) +from google.cloud.pubsublite.types import TopicPath + +PublisherFactory = Callable[[TopicPath], SinglePublisher] + + +class MultiplexedPublisherClient(PublisherClientInterface): + _publisher_factory: PublisherFactory + + _lock: Lock + _live_publishers: Dict[TopicPath, SinglePublisher] + + def __init__(self, publisher_factory: PublisherFactory): + self._publisher_factory = publisher_factory + self._lock = Lock() + self._live_publishers = {} + + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> "Future[str]": + if isinstance(topic, str): + topic = TopicPath.parse(topic) + publisher: SinglePublisher + with self._lock: + if topic not in self._live_publishers: + publisher = self._publisher_factory(topic) + publisher.__enter__() + self._live_publishers[topic] = publisher + publisher = self._live_publishers[topic] + future = publisher.publish(data=data, ordering_key=ordering_key, **attrs) + future.add_done_callback( + lambda fut: self._on_future_completion(topic, publisher, fut) + ) + + def _on_future_completion( + self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" + ): + try: + future.result() + except GoogleAPICallError: + with self._lock: + if topic not in self._live_publishers: + return + current_publisher = self._live_publishers[topic] + if current_publisher is not publisher: + return + del self._live_publishers[topic] + publisher.__exit__(None, None, None) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_value, traceback): + live_publishers: Dict[TopicPath, SinglePublisher] + with self._lock: + live_publishers = self._live_publishers + self._live_publishers = {} + for topic, pub in live_publishers.items(): + pub.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py new file mode 100644 index 00000000..a520b29e --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -0,0 +1,85 @@ +from concurrent.futures.thread import ThreadPoolExecutor +from threading import Lock +from typing import Union, Dict + +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture + +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, +) +from google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl import SubscriberImpl +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + SubscriberClientInterface, + MessageCallback, +) +from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings + + +class MultiplexedSubscriberClient(SubscriberClientInterface): + _executor: ThreadPoolExecutor + _underlying_factory: AsyncSubscriberFactory + + _lock: Lock + _live_subscribers: Dict[SubscriptionPath, StreamingPullFuture] + + def __init__( + self, executor: ThreadPoolExecutor, underlying_factory: AsyncSubscriberFactory + ): + self._executor = executor + self._underlying_factory = underlying_factory + self._lock = Lock() + self._live_subscribers = {} + + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + ) -> StreamingPullFuture: + if isinstance(subscription, str): + subscription = SubscriptionPath.parse(subscription) + future: StreamingPullFuture + with self._lock: + if subscription in self._live_subscribers: + raise FailedPrecondition( + f"Cannot subscribe to the same subscription twice. {subscription}" + ) + underlying = self._underlying_factory( + subscription, per_partition_flow_control_settings + ) + subscriber = SubscriberImpl(underlying, callback, self._executor) + subscriber.__enter__() + future = StreamingPullFuture(subscriber) + self._live_subscribers[subscription] = future + future.add_done_callback(lambda fut: self._on_future_failure(subscription, fut)) + return future + + def _on_future_failure( + self, subscription: SubscriptionPath, future: StreamingPullFuture + ): + with self._lock: + if subscription not in self._live_subscribers: + return + current_future = self._live_subscribers[subscription] + if current_future is not future: + return + del self._live_subscribers[subscription] + + def __enter__(self): + self._executor.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + futures: Dict[SubscriptionPath, StreamingPullFuture] + with self._lock: + futures = self._live_subscribers + self._live_subscribers = {} + for sub, future in futures.items(): + future.cancel() + try: + future.result() + except: # noqa: E722 + pass + self._executor.__exit__(exc_type, exc_value, traceback) + return super().__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py index 7fb3b68c..428141b9 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/publisher_impl.py @@ -4,14 +4,17 @@ from google.cloud.pubsublite.cloudpubsub.internal.managed_event_loop import ( ManagedEventLoop, ) -from google.cloud.pubsublite.cloudpubsub.publisher import Publisher, AsyncPublisher +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + SinglePublisher, + AsyncSinglePublisher, +) -class PublisherImpl(Publisher): +class SinglePublisherImpl(SinglePublisher): _managed_loop: ManagedEventLoop - _underlying: AsyncPublisher + _underlying: AsyncSinglePublisher - def __init__(self, underlying: AsyncPublisher): + def __init__(self, underlying: AsyncSinglePublisher): super().__init__() self._managed_loop = ManagedEventLoop() self._underlying = underlying diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py index 94561584..8b441507 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_partition_subscriber.py @@ -10,7 +10,9 @@ from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable from google.cloud.pubsublite.internal.wire.subscriber import Subscriber from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage @@ -22,7 +24,7 @@ class _SizedMessage(NamedTuple): size_bytes: int -class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber): +class SinglePartitionSingleSubscriber(PermanentFailable, AsyncSingleSubscriber): _underlying: Subscriber _flow_control_settings: FlowControlSettings _ack_set_tracker: AckSetTracker diff --git a/google/cloud/pubsublite/cloudpubsub/publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py similarity index 94% rename from google/cloud/pubsublite/cloudpubsub/publisher.py rename to google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py index 7824de51..d4568cf1 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py @@ -3,7 +3,7 @@ from concurrent import futures -class AsyncPublisher(AsyncContextManager): +class AsyncSinglePublisher(AsyncContextManager): """ An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an async context. Any publish failures are permanent. @@ -31,7 +31,7 @@ async def publish( """ -class Publisher(ContextManager): +class SinglePublisher(ContextManager): """ A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py similarity index 76% rename from google/cloud/pubsublite/cloudpubsub/subscriber.py rename to google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py index 1015f70b..9436252d 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py @@ -3,8 +3,10 @@ from google.cloud.pubsub_v1.subscriber.message import Message +from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings -class AsyncSubscriber(AsyncContextManager): + +class AsyncSingleSubscriber(AsyncContextManager): """ A Cloud Pub/Sub asynchronous subscriber. @@ -28,4 +30,6 @@ async def read(self) -> Message: raise NotImplementedError() -MessageCallback = Callable[[Message], None] +AsyncSubscriberFactory = Callable[ + [SubscriptionPath, FlowControlSettings], AsyncSingleSubscriber +] diff --git a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py index 70b658d1..46fb14db 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py @@ -10,16 +10,18 @@ StreamingPullManager, CloseCallback, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( MessageCallback, ) class SubscriberImpl(ContextManager, StreamingPullManager): - _underlying: AsyncSubscriber + _underlying: AsyncSingleSubscriber _callback: MessageCallback - _executor: ThreadPoolExecutor + _unowned_executor: ThreadPoolExecutor _event_loop: ManagedEventLoop @@ -31,13 +33,13 @@ class SubscriberImpl(ContextManager, StreamingPullManager): def __init__( self, - underlying: AsyncSubscriber, + underlying: AsyncSingleSubscriber, callback: MessageCallback, - executor: ThreadPoolExecutor, + unowned_executor: ThreadPoolExecutor, ): self._underlying = underlying self._callback = callback - self._executor = executor + self._unowned_executor = unowned_executor self._event_loop = ManagedEventLoop() self._close_lock = threading.Lock() self._failure = None @@ -68,9 +70,9 @@ async def _poller(self): try: while True: message = await self._underlying.read() - self._executor.submit(self._callback, message) + self._unowned_executor.submit(self._callback, message) except GoogleAPICallError as e: # noqa: F841 Flake8 thinks e is unused - self._executor.submit(lambda: self._fail(e)) # noqa: F821 + self._unowned_executor.submit(lambda: self._fail(e)) # noqa: F821 def __enter__(self): assert self._close_callback is not None @@ -90,5 +92,4 @@ def __exit__(self, exc_type, exc_value, traceback): ).result() self._event_loop.__exit__(exc_type, exc_value, traceback) assert self._close_callback is not None - self._executor.shutdown(wait=False) # __exit__ may be called from the executor. self._close_callback(self, self._failure) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py new file mode 100644 index 00000000..b1a6d419 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -0,0 +1,140 @@ +from concurrent.futures import Future +from typing import Optional, Mapping, Union + +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.cloud.pubsub_v1.types import BatchSettings +from google.oauth2 import service_account + +from google.cloud.pubsublite.cloudpubsub.internal.make_publisher import ( + make_publisher, + make_async_publisher, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_publisher_client import ( + MultiplexedAsyncPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_publisher_client import ( + MultiplexedPublisherClient, +) +from google.cloud.pubsublite.cloudpubsub.publisher_client_interface import ( + PublisherClientInterface, + AsyncPublisherClientInterface, +) +from google.cloud.pubsublite.internal.wire.make_publisher import ( + DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, +) +from google.cloud.pubsublite.types import TopicPath + + +class PublisherClient(PublisherClientInterface): + _impl: PublisherClientInterface + + DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + + def __init__( + self, + per_partition_batching_setttings: Optional[BatchSettings] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + self._impl = MultiplexedPublisherClient( + lambda topic: make_publisher( + topic=topic, + per_partition_batching_settings=per_partition_batching_setttings, + credentials=credentials, + client_options=client_options, + transport=transport, + ) + ) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates an instance of this client using the provided credentials + file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A PublisherClient. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file + + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> "Future[str]": + return self._impl.publish( + topic=topic, data=data, ordering_key=ordering_key, **attrs + ) + + def __enter__(self): + self._impl.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._impl.__exit__(exc_type, exc_value, traceback) + + +class AsyncPublisherClient(AsyncPublisherClientInterface): + _impl: AsyncPublisherClientInterface + + DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + + def __init__( + self, + per_partition_batching_setttings: Optional[BatchSettings] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + self._impl = MultiplexedAsyncPublisherClient( + lambda topic: make_async_publisher( + topic=topic, + per_partition_batching_settings=per_partition_batching_setttings, + credentials=credentials, + client_options=client_options, + transport=transport, + ) + ) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates an instance of this client using the provided credentials + file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A PublisherClient. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file + + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> str: + return await self._impl.publish( + topic=topic, data=data, ordering_key=ordering_key, **attrs + ) + + async def __aenter__(self): + await self._impl.__aenter__() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py new file mode 100644 index 00000000..c4395359 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -0,0 +1,70 @@ +from abc import abstractmethod +from concurrent.futures import Future +from typing import ContextManager, Mapping, Union, AsyncContextManager + +from google.cloud.pubsublite.types import TopicPath + + +class AsyncPublisherClientInterface(AsyncContextManager): + """ + An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are permanent. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + @abstractmethod + async def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> str: + """ + Publish a message. + + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + An ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +class PublisherClientInterface(ContextManager): + """ + A PublisherClientInterface publishes messages similar to Google Pub/Sub. + + Must be used in a `with` block or have __enter__() called before use. + """ + + @abstractmethod + def publish( + self, + topic: Union[TopicPath, str], + data: bytes, + ordering_key: str = "", + **attrs: Mapping[str, str] + ) -> "Future[str]": + """ + Publish a message. + + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. + + Returns: + A future completed with an ack id, which can be decoded using PublishMetadata.decode. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py new file mode 100644 index 00000000..0b2298e7 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -0,0 +1,151 @@ +from concurrent.futures.thread import ThreadPoolExecutor +from typing import Optional, Union, Set, AsyncIterator + +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture +from google.cloud.pubsub_v1.subscriber.message import Message +from google.oauth2 import service_account + +from google.cloud.pubsublite.cloudpubsub.internal.make_subscriber import ( + make_async_subscriber, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_subscriber_client import ( + MultiplexedAsyncSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_subscriber_client import ( + MultiplexedSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer +from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( + SubscriberClientInterface, + MessageCallback, + AsyncSubscriberClientInterface, +) +from google.cloud.pubsublite.types import ( + FlowControlSettings, + Partition, + SubscriptionPath, +) + + +class SubscriberClient(SubscriberClientInterface): + _impl: SubscriberClientInterface + + def __init__( + self, + executor: Optional[ThreadPoolExecutor] = None, + nack_handler: Optional[NackHandler] = None, + message_transformer: Optional[MessageTransformer] = None, + fixed_partitions: Optional[Set[Partition]] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + if executor is None: + executor = ThreadPoolExecutor() + self._impl = MultiplexedSubscriberClient( + executor, + lambda subscription, settings: make_async_subscriber( + subscription=subscription, + transport=transport, + per_partition_flow_control_settings=settings, + nack_handler=nack_handler, + message_transformer=message_transformer, + fixed_partitions=fixed_partitions, + credentials=credentials, + client_options=client_options, + ), + ) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates an instance of this client using the provided credentials + file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A PublisherClient. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file + + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + ) -> StreamingPullFuture: + return self._impl.subscribe( + subscription, callback, per_partition_flow_control_settings + ) + + def __enter__(self): + self._impl.__enter__() + return self + + def __exit__(self, exc_type, exc_value, traceback): + self._impl.__exit__(exc_type, exc_value, traceback) + + +class AsyncSubscriberClient(AsyncSubscriberClientInterface): + _impl: AsyncSubscriberClientInterface + + def __init__( + self, + nack_handler: Optional[NackHandler] = None, + message_transformer: Optional[MessageTransformer] = None, + fixed_partitions: Optional[Set[Partition]] = None, + credentials: Optional[Credentials] = None, + transport: str = "grpc_asyncio", + client_options: Optional[ClientOptions] = None, + ): + self._impl = MultiplexedAsyncSubscriberClient( + lambda subscription, settings: make_async_subscriber( + subscription=subscription, + transport=transport, + per_partition_flow_control_settings=settings, + nack_handler=nack_handler, + message_transformer=message_transformer, + fixed_partitions=fixed_partitions, + credentials=credentials, + client_options=client_options, + ) + ) + + @classmethod + def from_service_account_file(cls, filename, **kwargs): + """Creates an instance of this client using the provided credentials + file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A PublisherClient. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file + + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + ) -> AsyncIterator[Message]: + return await self._impl.subscribe( + subscription, per_partition_flow_control_settings + ) + + async def __aenter__(self): + await self._impl.__aenter__() + return self + + async def __aexit__(self, exc_type, exc_value, traceback): + await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py new file mode 100644 index 00000000..89a46e24 --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -0,0 +1,73 @@ +from abc import abstractmethod +from typing import ContextManager, Union, AsyncContextManager, AsyncIterator, Callable + +from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture +from google.cloud.pubsub_v1.subscriber.message import Message + +from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings + + +class AsyncSubscriberClientInterface(AsyncContextManager): + """ + An AsyncPublisherClientInterface reads messages similar to Google Pub/Sub, but must be used in an + async context. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + + @abstractmethod + async def subscribe( + self, + subscription: Union[SubscriptionPath, str], + per_partition_flow_control_settings: FlowControlSettings, + ) -> AsyncIterator[Message]: + """ + Read messages from a subscription. + + Args: + subscription: The subscription to subscribe to. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + + Returns: + An AsyncIterator with Messages that must have ack() called on each exactly once. + + Raises: + GoogleApiCallError: On a permanent failure. + """ + + +MessageCallback = Callable[[Message], None] + + +class SubscriberClientInterface(ContextManager): + """ + A SubscriberClientInterface reads messages similar to Google Pub/Sub. + + Must be used in a `with` block or have __enter__() called before use. + """ + + @abstractmethod + def subscribe( + self, + subscription: Union[SubscriptionPath, str], + callback: MessageCallback, + per_partition_flow_control_settings: FlowControlSettings, + ) -> StreamingPullFuture: + """ + This method starts a background thread to begin pulling messages from + a Pub/Sub Lite subscription and scheduling them to be processed using the + provided ``callback``. + + Args: + subscription: The subscription to subscribe to. + callback: The callback function. This function receives the message as its only argument. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + + Returns: + A StreamingPullFuture instance that can be used to manage the background stream. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index 56e74162..4329030b 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -35,6 +35,7 @@ def make_publisher( topic: TopicPath, + transport: str, per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, @@ -45,6 +46,7 @@ def make_publisher( Args: topic: The topic to publish to. + transport: The transport type to use. per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. @@ -60,6 +62,7 @@ def make_publisher( per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS admin_client = make_admin_client( region=topic.location.region, + transport=transport, credentials=credentials, client_options=client_options, ) @@ -68,7 +71,7 @@ def make_publisher( api_endpoint=regional_endpoint(topic.location.region) ) client = async_client.PublisherServiceAsyncClient( - credentials=credentials, client_options=client_options + credentials=credentials, transport=transport, client_options=client_options ) # type: ignore clients: MutableMapping[Partition, Publisher] = {} diff --git a/google/cloud/pubsublite/make_admin_client.py b/google/cloud/pubsublite/make_admin_client.py index 5e4779e4..63043e50 100644 --- a/google/cloud/pubsublite/make_admin_client.py +++ b/google/cloud/pubsublite/make_admin_client.py @@ -12,12 +12,15 @@ def make_admin_client( region: CloudRegion, + transport: str = "grpc_asyncio", credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, ) -> AdminClient: if client_options is None: client_options = ClientOptions(api_endpoint=regional_endpoint(region)) return AdminClientImpl( - AdminServiceClient(client_options=client_options, credentials=credentials), + AdminServiceClient( + client_options=client_options, transport=transport, credentials=credentials + ), region, ) diff --git a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py index a0185fe2..85c7db14 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/assigning_subscriber_test.py @@ -8,10 +8,12 @@ from google.pubsub_v1 import PubsubMessage from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import ( - AssigningSubscriber, + AssigningSingleSubscriber, PartitionSubscriberFactory, ) -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.assigner import Assigner from google.cloud.pubsublite.types import Partition from google.cloud.pubsublite.testing.test_utils import wire_queues, Box @@ -40,7 +42,7 @@ def subscriber(assigner, subscriber_factory): box = Box() def set_box(): - box.val = AssigningSubscriber(lambda: assigner, subscriber_factory) + box.val = AssigningSingleSubscriber(lambda: assigner, subscriber_factory) # Initialize AssigningSubscriber on another thread with a different event loop. thread = threading.Thread(target=set_box) @@ -62,8 +64,8 @@ async def test_initial_assignment(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) subscriber_factory.side_effect = ( lambda partition: sub1 if partition == Partition(1) else sub2 ) @@ -91,9 +93,9 @@ async def test_assignment_change(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub3 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub3 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) subscriber_factory.side_effect = ( lambda partition: sub1 if partition == Partition(1) @@ -124,7 +126,7 @@ async def test_subscriber_failure(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) sub1_queues = wire_queues(sub1.read) subscriber_factory.return_value = sub1 await assign_queues.results.put({Partition(1)}) @@ -138,8 +140,8 @@ async def test_delivery_from_multiple(subscriber, assigner, subscriber_factory): assign_queues = wire_queues(assigner.get_assignment) async with subscriber: await assign_queues.called.get() - sub1 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) - sub2 = mock_async_context_manager(MagicMock(spec=AsyncSubscriber)) + sub1 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) + sub2 = mock_async_context_manager(MagicMock(spec=AsyncSingleSubscriber)) sub1_queues = wire_queues(sub1.read) sub2_queues = wire_queues(sub2.read) subscriber_factory.side_effect = ( diff --git a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py index 530dd550..67b3e8e8 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/publisher_impl_test.py @@ -1,23 +1,28 @@ from asynctest.mock import MagicMock import pytest -from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl -from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher +from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import ( + SinglePublisherImpl, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( + AsyncSinglePublisher, + SinglePublisher, +) @pytest.fixture() def async_publisher(): - publisher = MagicMock(spec=AsyncPublisher) + publisher = MagicMock(spec=AsyncSinglePublisher) publisher.__aenter__.return_value = publisher return publisher @pytest.fixture() def publisher(async_publisher): - return PublisherImpl(async_publisher) + return SinglePublisherImpl(async_publisher) -def test_proxies_to_async(async_publisher, publisher: Publisher): +def test_proxies_to_async(async_publisher, publisher: SinglePublisher): with publisher: async_publisher.__aenter__.assert_called_once() publisher.publish(data=b"abc", ordering_key="zyx", xyz="xyz").result() diff --git a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py index b7a77e18..0a136e8d 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/single_partition_subscriber_test.py @@ -10,11 +10,13 @@ from google.cloud.pubsublite.types import FlowControlSettings from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import ( - SinglePartitionSubscriber, + SinglePartitionSingleSubscriber, ) from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler -from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) from google.cloud.pubsublite.internal.wire.subscriber import Subscriber from google.cloud.pubsublite.testing.test_utils import make_queue_waiter from google.cloud.pubsublite_v1 import Cursor, FlowControlRequest, SequencedMessage @@ -69,7 +71,7 @@ def transformer(): def subscriber( underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer ): - return SinglePartitionSubscriber( + return SinglePartitionSingleSubscriber( underlying, flow_control_settings, ack_set_tracker, nack_handler, transformer ) @@ -92,7 +94,7 @@ async def test_failed_transform(subscriber, underlying, transformer): async def test_ack( - subscriber: AsyncSubscriber, underlying, transformer, ack_set_tracker + subscriber: AsyncSingleSubscriber, underlying, transformer, ack_set_tracker ): ack_called_queue = asyncio.Queue() ack_result_queue = asyncio.Queue() @@ -121,7 +123,10 @@ async def test_ack( async def test_track_failure( - subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker + subscriber: SinglePartitionSingleSubscriber, + underlying, + transformer, + ack_set_tracker, ): async with subscriber: ack_set_tracker.track.side_effect = FailedPrecondition("Bad track") @@ -133,7 +138,10 @@ async def test_track_failure( async def test_ack_failure( - subscriber: SinglePartitionSubscriber, underlying, transformer, ack_set_tracker + subscriber: SinglePartitionSingleSubscriber, + underlying, + transformer, + ack_set_tracker, ): ack_called_queue = asyncio.Queue() ack_result_queue = asyncio.Queue() @@ -159,7 +167,7 @@ async def sleep_forever(): async def test_nack_failure( - subscriber: SinglePartitionSubscriber, + subscriber: SinglePartitionSingleSubscriber, underlying, transformer, ack_set_tracker, @@ -182,7 +190,7 @@ async def sleep_forever(): async def test_nack_calls_ack( - subscriber: SinglePartitionSubscriber, + subscriber: SinglePartitionSingleSubscriber, underlying, transformer, ack_set_tracker, diff --git a/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py b/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py index dc887ece..aaf35664 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/subscriber_impl_test.py @@ -13,8 +13,10 @@ CloseCallback, ) from google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl import SubscriberImpl -from google.cloud.pubsublite.cloudpubsub.subscriber import ( - AsyncSubscriber, +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( MessageCallback, ) from google.cloud.pubsublite.testing.test_utils import Box @@ -22,7 +24,7 @@ @pytest.fixture() def async_subscriber(): - subscriber = MagicMock(spec=AsyncSubscriber) + subscriber = MagicMock(spec=AsyncSingleSubscriber) subscriber.__aenter__.return_value = subscriber return subscriber From 8f485afbc08624cf43fffdcf49c98477a8c8b342 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 28 Oct 2020 11:29:20 -0400 Subject: [PATCH 2/8] fix: Assorted fixes --- .../cloudpubsub/internal/make_publisher.py | 22 ++++++++++--------- .../multiplexed_async_publisher_client.py | 4 ++-- .../internal/multiplexed_publisher_client.py | 1 + .../internal/multiplexed_subscriber_client.py | 2 +- .../internal/wire/make_publisher.py | 1 - google/cloud/pubsublite/make_admin_client.py | 5 +---- 6 files changed, 17 insertions(+), 18 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py index 6d82684d..c3b1ac4d 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py @@ -55,11 +55,12 @@ def make_async_publisher( def underlying_factory(): return make_wire_publisher( - topic, - per_partition_batching_settings, - credentials, - client_options, - metadata, + topic=topic, + transport=transport, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + metadata=metadata, ) return AsyncSinglePublisherImpl(underlying_factory) @@ -92,10 +93,11 @@ def make_publisher( """ return SinglePublisherImpl( make_async_publisher( - topic, - per_partition_batching_settings, - credentials, - client_options, - metadata, + topic=topic, + transport=transport, + per_partition_batching_settings=per_partition_batching_settings, + credentials=credentials, + client_options=client_options, + metadata=metadata, ) ) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py index 606726f3..44e65662 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py @@ -42,10 +42,10 @@ async def publish( data=data, ordering_key=ordering_key, **attrs ) except GoogleAPICallError as e: - self._on_failure(topic, publisher) + await self._on_failure(topic, publisher) raise e - def _on_failure(self, topic: TopicPath, publisher: AsyncSinglePublisher): + async def _on_failure(self, topic: TopicPath, publisher: AsyncSinglePublisher): if topic not in self._live_publishers: return current_publisher = self._live_publishers[topic] diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index fe571219..ceb7045c 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -46,6 +46,7 @@ def publish( future.add_done_callback( lambda fut: self._on_future_completion(topic, publisher, fut) ) + return future def _on_future_completion( self, topic: TopicPath, publisher: SinglePublisher, future: "Future[str]" diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py index a520b29e..081fb683 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -49,8 +49,8 @@ def subscribe( subscription, per_partition_flow_control_settings ) subscriber = SubscriberImpl(underlying, callback, self._executor) - subscriber.__enter__() future = StreamingPullFuture(subscriber) + subscriber.__enter__() self._live_subscribers[subscription] = future future.add_done_callback(lambda fut: self._on_future_failure(subscription, fut)) return future diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index 4329030b..9705533c 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -62,7 +62,6 @@ def make_publisher( per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS admin_client = make_admin_client( region=topic.location.region, - transport=transport, credentials=credentials, client_options=client_options, ) diff --git a/google/cloud/pubsublite/make_admin_client.py b/google/cloud/pubsublite/make_admin_client.py index 63043e50..5e4779e4 100644 --- a/google/cloud/pubsublite/make_admin_client.py +++ b/google/cloud/pubsublite/make_admin_client.py @@ -12,15 +12,12 @@ def make_admin_client( region: CloudRegion, - transport: str = "grpc_asyncio", credentials: Optional[Credentials] = None, client_options: Optional[ClientOptions] = None, ) -> AdminClient: if client_options is None: client_options = ClientOptions(api_endpoint=regional_endpoint(region)) return AdminClientImpl( - AdminServiceClient( - client_options=client_options, transport=transport, credentials=credentials - ), + AdminServiceClient(client_options=client_options, credentials=credentials), region, ) From f8d4926d2569016507a72bdf39e320a2870d4f3a Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 28 Oct 2020 11:34:41 -0400 Subject: [PATCH 3/8] fix: Export clients in google.cloud.pubsublite.cloudpubsub --- google/cloud/pubsublite/cloudpubsub/__init__.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/google/cloud/pubsublite/cloudpubsub/__init__.py b/google/cloud/pubsublite/cloudpubsub/__init__.py index e69de29b..9ec26360 100644 --- a/google/cloud/pubsublite/cloudpubsub/__init__.py +++ b/google/cloud/pubsublite/cloudpubsub/__init__.py @@ -0,0 +1,6 @@ +from .message_transformer import MessageTransformer +from .nack_handler import NackHandler +from .publisher_client_interface import PublisherClientInterface, AsyncPublisherClientInterface +from .publisher_client import PublisherClient, AsyncPublisherClient +from .subscriber_client_interface import SubscriberClientInterface, AsyncSubscriberClientInterface +from .subscriber_client import SubscriberClient, AsyncSubscriberClient From 5a46b9791054fbd4ade57de6cbc0a8fa6424c444 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 29 Oct 2020 14:01:48 -0400 Subject: [PATCH 4/8] fix: Add tests, break out repeated code, and restructure admin client --- .coveragerc | 2 + google/cloud/pubsublite/admin_client.py | 108 +++++++++++------ .../pubsublite/admin_client_interface.py | 71 +++++++++++ .../cloud/pubsublite/cloudpubsub/__init__.py | 10 +- .../internal/client_multiplexer.py | 112 ++++++++++++++++++ .../cloudpubsub/internal/make_subscriber.py | 61 ---------- .../multiplexed_async_publisher_client.py | 44 ++++--- .../multiplexed_async_subscriber_client.py | 57 ++++----- .../internal/multiplexed_publisher_client.py | 44 +++---- .../internal/multiplexed_subscriber_client.py | 74 ++++++------ .../cloudpubsub/internal/single_subscriber.py | 11 +- .../cloudpubsub/publisher_client.py | 92 ++++++++------ .../cloudpubsub/publisher_client_interface.py | 19 +-- .../cloudpubsub/subscriber_client.py | 101 +++++++++------- .../subscriber_client_interface.py | 24 +++- .../constructable_from_service_account.py | 18 +++ .../internal/wire/admin_client_impl.py | 4 +- .../internal/wire/make_publisher.py | 4 +- .../internal/wire/routing_publisher.py | 7 +- google/cloud/pubsublite/make_admin_client.py | 23 ---- setup.py | 1 + .../internal/async_client_multiplexer_test.py | 69 +++++++++++ .../internal/client_multiplexer_test.py | 67 +++++++++++ ...ultiplexed_async_subscriber_client_test.py | 76 ++++++++++++ 24 files changed, 758 insertions(+), 341 deletions(-) create mode 100644 google/cloud/pubsublite/admin_client_interface.py create mode 100644 google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py create mode 100644 google/cloud/pubsublite/internal/constructable_from_service_account.py delete mode 100644 google/cloud/pubsublite/make_admin_client.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py create mode 100644 tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py diff --git a/.coveragerc b/.coveragerc index 68a287da..57cd1a52 100644 --- a/.coveragerc +++ b/.coveragerc @@ -29,6 +29,8 @@ exclude_lines = # Ignore abstract methods raise NotImplementedError @abstractmethod + # Ignore delegating methods + impl_. omit = */pubsublite_v1/*.py diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index d1882ef8..eac81675 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -1,67 +1,109 @@ -from abc import ABC, abstractmethod -from typing import List +from typing import Optional, List +from overrides import overrides +from google.api_core.client_options import ClientOptions +from google.auth.credentials import Credentials +from google.protobuf.field_mask_pb2 import FieldMask + +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) +from google.cloud.pubsublite.internal.endpoints import regional_endpoint +from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl from google.cloud.pubsublite.types import ( CloudRegion, - TopicPath, - LocationPath, SubscriptionPath, + LocationPath, + TopicPath, ) -from google.cloud.pubsublite_v1 import Topic, Subscription -from google.protobuf.field_mask_pb2 import FieldMask +from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic + + +class AdminClient(AdminClientInterface, ConstructableFromServiceAccount): + """ + An admin client for Pub/Sub Lite. Only operates on a single region. + """ + + _impl: AdminClientInterface + + def __init__( + self, + region: CloudRegion, + credentials: Optional[Credentials] = None, + transport: Optional[str] = None, + client_options: Optional[ClientOptions] = None, + ): + """ + Create a new AdminClient. + Args: + region: The cloud region to connect to. + credentials: The credentials to use when connecting. + transport: The transport to use. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ + if client_options is None: + client_options = ClientOptions(api_endpoint=regional_endpoint(region)) + self._impl = AdminClientImpl( + AdminServiceClient( + client_options=client_options, + transport=transport, + credentials=credentials, + ), + region, + ) -class AdminClient(ABC): - @abstractmethod + @overrides def region(self) -> CloudRegion: - """The region this client is for.""" + return self._impl.region() - @abstractmethod + @overrides def create_topic(self, topic: Topic) -> Topic: - """Create a topic, returns the created topic.""" + return self._impl.create_topic(topic) - @abstractmethod + @overrides def get_topic(self, topic_path: TopicPath) -> Topic: - """Get the topic object from the server.""" + return self._impl.get_topic(topic_path) - @abstractmethod + @overrides def get_topic_partition_count(self, topic_path: TopicPath) -> int: - """Get the number of partitions in the provided topic.""" + return self._impl.get_topic_partition_count(topic_path) - @abstractmethod + @overrides def list_topics(self, location_path: LocationPath) -> List[Topic]: - """List the Pub/Sub lite topics that exist for a project in a given location.""" + return self._impl.list_topics(location_path) - @abstractmethod + @overrides def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: - """Update the masked fields of the provided topic.""" + return self._impl.update_topic(topic, update_mask) - @abstractmethod + @overrides def delete_topic(self, topic_path: TopicPath): - """Delete a topic and all associated messages.""" + return self._impl.delete_topic(topic_path) - @abstractmethod + @overrides def list_topic_subscriptions(self, topic_path: TopicPath): - """List the subscriptions that exist for a given topic.""" + return self._impl.list_topic_subscriptions(topic_path) - @abstractmethod + @overrides def create_subscription(self, subscription: Subscription) -> Subscription: - """Create a subscription, returns the created subscription.""" + return self._impl.create_subscription(subscription) - @abstractmethod + @overrides def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: - """Get the subscription object from the server.""" + return self._impl.get_subscription(subscription_path) - @abstractmethod + @overrides def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: - """List the Pub/Sub lite subscriptions that exist for a project in a given location.""" + return self._impl.list_subscriptions(location_path) - @abstractmethod + @overrides def update_subscription( self, subscription: Subscription, update_mask: FieldMask ) -> Subscription: - """Update the masked fields of the provided subscription.""" + return self._impl.update_subscription(subscription, update_mask) - @abstractmethod + @overrides def delete_subscription(self, subscription_path: SubscriptionPath): - """Delete a subscription and all associated messages.""" + return self._impl.delete_subscription(subscription_path) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py new file mode 100644 index 00000000..d8794768 --- /dev/null +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -0,0 +1,71 @@ +from abc import ABC, abstractmethod +from typing import List + +from google.cloud.pubsublite.types import ( + CloudRegion, + TopicPath, + LocationPath, + SubscriptionPath, +) +from google.cloud.pubsublite_v1 import Topic, Subscription +from google.protobuf.field_mask_pb2 import FieldMask + + +class AdminClientInterface(ABC): + """ + An admin client for Pub/Sub Lite. Only operates on a single region. + """ + + @abstractmethod + def region(self) -> CloudRegion: + """The region this client is for.""" + + @abstractmethod + def create_topic(self, topic: Topic) -> Topic: + """Create a topic, returns the created topic.""" + + @abstractmethod + def get_topic(self, topic_path: TopicPath) -> Topic: + """Get the topic object from the server.""" + + @abstractmethod + def get_topic_partition_count(self, topic_path: TopicPath) -> int: + """Get the number of partitions in the provided topic.""" + + @abstractmethod + def list_topics(self, location_path: LocationPath) -> List[Topic]: + """List the Pub/Sub lite topics that exist for a project in a given location.""" + + @abstractmethod + def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: + """Update the masked fields of the provided topic.""" + + @abstractmethod + def delete_topic(self, topic_path: TopicPath): + """Delete a topic and all associated messages.""" + + @abstractmethod + def list_topic_subscriptions(self, topic_path: TopicPath): + """List the subscriptions that exist for a given topic.""" + + @abstractmethod + def create_subscription(self, subscription: Subscription) -> Subscription: + """Create a subscription, returns the created subscription.""" + + @abstractmethod + def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription: + """Get the subscription object from the server.""" + + @abstractmethod + def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]: + """List the Pub/Sub lite subscriptions that exist for a project in a given location.""" + + @abstractmethod + def update_subscription( + self, subscription: Subscription, update_mask: FieldMask + ) -> Subscription: + """Update the masked fields of the provided subscription.""" + + @abstractmethod + def delete_subscription(self, subscription_path: SubscriptionPath): + """Delete a subscription and all associated messages.""" diff --git a/google/cloud/pubsublite/cloudpubsub/__init__.py b/google/cloud/pubsublite/cloudpubsub/__init__.py index 9ec26360..d9180447 100644 --- a/google/cloud/pubsublite/cloudpubsub/__init__.py +++ b/google/cloud/pubsublite/cloudpubsub/__init__.py @@ -1,6 +1,12 @@ from .message_transformer import MessageTransformer from .nack_handler import NackHandler -from .publisher_client_interface import PublisherClientInterface, AsyncPublisherClientInterface +from .publisher_client_interface import ( + PublisherClientInterface, + AsyncPublisherClientInterface, +) from .publisher_client import PublisherClient, AsyncPublisherClient -from .subscriber_client_interface import SubscriberClientInterface, AsyncSubscriberClientInterface +from .subscriber_client_interface import ( + SubscriberClientInterface, + AsyncSubscriberClientInterface, +) from .subscriber_client import SubscriberClient, AsyncSubscriberClient diff --git a/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py b/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py new file mode 100644 index 00000000..7f097f4c --- /dev/null +++ b/google/cloud/pubsublite/cloudpubsub/internal/client_multiplexer.py @@ -0,0 +1,112 @@ +import asyncio +import threading +from typing import Generic, TypeVar, Callable, Dict, Awaitable + +from google.api_core.exceptions import FailedPrecondition + +_Key = TypeVar("_Key") +_Client = TypeVar("_Client") + + +class ClientMultiplexer(Generic[_Key, _Client]): + _OpenedClientFactory = Callable[[], _Client] + _ClientCloser = Callable[[_Client], None] + + _closer: _ClientCloser + _lock: threading.Lock + _live_clients: Dict[_Key, _Client] + + def __init__( + self, closer: _ClientCloser = lambda client: client.__exit__(None, None, None) + ): + self._closer = closer + self._lock = threading.Lock() + self._live_clients = {} + + def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + with self._lock: + if key not in self._live_clients: + self._live_clients[key] = factory() + return self._live_clients[key] + + def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + with self._lock: + if key in self._live_clients: + raise FailedPrecondition( + f"Cannot create two clients with the same key. {_Key}" + ) + self._live_clients[key] = factory() + return self._live_clients[key] + + def try_erase(self, key: _Key, client: _Client): + with self._lock: + if key not in self._live_clients: + return + current_client = self._live_clients[key] + if current_client is not client: + return + del self._live_clients[key] + self._closer(client) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + live_clients: Dict[_Key, _Client] + with self._lock: + live_clients = self._live_clients + self._live_clients = {} + for topic, client in live_clients.items(): + self._closer(client) + + +class AsyncClientMultiplexer(Generic[_Key, _Client]): + _OpenedClientFactory = Callable[[], Awaitable[_Client]] + _ClientCloser = Callable[[_Client], Awaitable[None]] + + _closer: _ClientCloser + _lock: asyncio.Lock + _live_clients: Dict[_Key, _Client] + + def __init__( + self, closer: _ClientCloser = lambda client: client.__aexit__(None, None, None) + ): + self._closer = closer + self._live_clients = {} + + async def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + async with self._lock: + if key not in self._live_clients: + self._live_clients[key] = await factory() + return self._live_clients[key] + + async def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: + async with self._lock: + if key in self._live_clients: + raise FailedPrecondition( + f"Cannot create two clients with the same key. {_Key}" + ) + self._live_clients[key] = await factory() + return self._live_clients[key] + + async def try_erase(self, key: _Key, client: _Client): + async with self._lock: + if key not in self._live_clients: + return + current_client = self._live_clients[key] + if current_client is not client: + return + del self._live_clients[key] + await self._closer(client) + + async def __aenter__(self): + self._lock = asyncio.Lock() + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + live_clients: Dict[_Key, _Client] + async with self._lock: + live_clients = self._live_clients + self._live_clients = {} + for topic, client in live_clients.items(): + await self._closer(client) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index 3229f52f..4c520019 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -1,14 +1,8 @@ -from concurrent.futures.thread import ThreadPoolExecutor from typing import Optional, Mapping, Set, AsyncIterator, Callable from uuid import uuid4 from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials -from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture - -from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( - MessageCallback, -) from google.cloud.pubsublite.types import FlowControlSettings from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import ( AckSetTrackerImpl, @@ -20,7 +14,6 @@ from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import ( SinglePartitionSingleSubscriber, ) -import google.cloud.pubsublite.cloudpubsub.internal.subscriber_impl as cps_subscriber from google.cloud.pubsublite.cloudpubsub.message_transformer import ( MessageTransformer, DefaultMessageTransformer, @@ -205,57 +198,3 @@ def make_async_subscriber( message_transformer, ) return AssigningSingleSubscriber(assigner_factory, partition_subscriber_factory) - - -def make_subscriber( - subscription: SubscriptionPath, - transport: str, - per_partition_flow_control_settings: FlowControlSettings, - callback: MessageCallback, - nack_handler: Optional[NackHandler] = None, - message_transformer: Optional[MessageTransformer] = None, - fixed_partitions: Optional[Set[Partition]] = None, - executor: Optional[ThreadPoolExecutor] = None, - credentials: Optional[Credentials] = None, - client_options: Optional[ClientOptions] = None, - metadata: Optional[Mapping[str, str]] = None, -) -> StreamingPullFuture: - """ - Make a Pub/Sub Lite Subscriber. - - Args: - subscription: The subscription to subscribe to. - transport: The transport type to use. - per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these - settings apply to each partition individually, not in aggregate. - callback: The callback to call with each message. - nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. - message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. - fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. - executor: The executor to use for user callbacks. If not provided, will use the default constructed - ThreadPoolExecutor. If provided a single threaded executor, messages will be ordered per-partition, but take care - that the callback does not block for too long as it will impede forward progress on all partitions. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. - - Returns: - A StreamingPullFuture, managing the subscriber's lifetime. - """ - underlying = make_async_subscriber( - subscription, - transport, - per_partition_flow_control_settings, - nack_handler, - message_transformer, - fixed_partitions, - credentials, - client_options, - metadata, - ) - if executor is None: - executor = ThreadPoolExecutor() - subscriber = cps_subscriber.SubscriberImpl(underlying, callback, executor) - future = StreamingPullFuture(subscriber) - subscriber.__enter__() - return future diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py index 44e65662..2755d6d9 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_publisher_client.py @@ -1,7 +1,10 @@ -from typing import Callable, Dict, Union, Mapping +from typing import Callable, Union, Mapping from google.api_core.exceptions import GoogleAPICallError +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( AsyncSinglePublisher, ) @@ -9,6 +12,7 @@ AsyncPublisherClientInterface, ) from google.cloud.pubsublite.types import TopicPath +from overrides import overrides AsyncPublisherFactory = Callable[[TopicPath], AsyncSinglePublisher] @@ -16,12 +20,13 @@ class MultiplexedAsyncPublisherClient(AsyncPublisherClientInterface): _publisher_factory: AsyncPublisherFactory - _live_publishers: Dict[TopicPath, AsyncSinglePublisher] + _multiplexer: AsyncClientMultiplexer[TopicPath, AsyncSinglePublisher] def __init__(self, publisher_factory: AsyncPublisherFactory): self._publisher_factory = publisher_factory - self._live_publishers = {} + self._multiplexer = AsyncClientMultiplexer() + @overrides async def publish( self, topic: Union[TopicPath, str], @@ -31,35 +36,26 @@ async def publish( ) -> str: if isinstance(topic, str): topic = TopicPath.parse(topic) - publisher: AsyncSinglePublisher - if topic not in self._live_publishers: - publisher = self._publisher_factory(topic) - self._live_publishers[topic] = publisher - await publisher.__aenter__() - publisher = self._live_publishers[topic] + + async def create_and_open(): + client = self._publisher_factory(topic) + await client.__aenter__() + return client + + publisher = await self._multiplexer.get_or_create(topic, create_and_open) try: return await publisher.publish( data=data, ordering_key=ordering_key, **attrs ) except GoogleAPICallError as e: - await self._on_failure(topic, publisher) + await self._multiplexer.try_erase(topic, publisher) raise e - async def _on_failure(self, topic: TopicPath, publisher: AsyncSinglePublisher): - if topic not in self._live_publishers: - return - current_publisher = self._live_publishers[topic] - if current_publisher is not publisher: - return - del self._live_publishers[topic] - - await publisher.__aexit__(None, None, None) - + @overrides async def __aenter__(self): + await self._multiplexer.__aenter__() return self + @overrides async def __aexit__(self, exc_type, exc_value, traceback): - live_publishers = self._live_publishers - self._live_publishers = {} - for topic, pub in live_publishers.items(): - await pub.__aexit__(exc_type, exc_value, traceback) + await self._multiplexer.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py index 52a96f9b..ce00b8d0 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client.py @@ -2,13 +2,16 @@ Union, AsyncIterator, Awaitable, - Dict, Callable, + Optional, + Set, ) -from google.api_core.exceptions import FailedPrecondition from google.cloud.pubsub_v1.subscriber.message import Message +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( AsyncSubscriberFactory, AsyncSingleSubscriber, @@ -16,7 +19,12 @@ from google.cloud.pubsublite.cloudpubsub.subscriber_client_interface import ( AsyncSubscriberClientInterface, ) -from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) +from overrides import overrides class _SubscriberAsyncIterator(AsyncIterator): @@ -44,48 +52,41 @@ def __aiter__(self): class MultiplexedAsyncSubscriberClient(AsyncSubscriberClientInterface): _underlying_factory: AsyncSubscriberFactory - _live_subscribers: Dict[SubscriptionPath, AsyncSingleSubscriber] + _multiplexer: AsyncClientMultiplexer[SubscriptionPath, AsyncSingleSubscriber] def __init__(self, underlying_factory: AsyncSubscriberFactory): self._underlying_factory = underlying_factory - self._live_subscribers = {} + self._multiplexer = AsyncClientMultiplexer() + @overrides async def subscribe( self, subscription: Union[SubscriptionPath, str], per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> AsyncIterator[Message]: if isinstance(subscription, str): subscription = SubscriptionPath.parse(subscription) - if subscription in self._live_subscribers: - raise FailedPrecondition( - f"Cannot subscribe to the same subscription twice. {subscription}" + + async def create_and_open(): + client = self._underlying_factory( + subscription, fixed_partitions, per_partition_flow_control_settings ) - subscriber = self._underlying_factory( - subscription, per_partition_flow_control_settings + await client.__aenter__() + return client + + subscriber = await self._multiplexer.get_or_create( + subscription, create_and_open ) - self._live_subscribers[subscription] = subscriber - await subscriber.__aenter__() return _SubscriberAsyncIterator( - subscriber, lambda: self._on_subscriber_failure(subscription, subscriber) + subscriber, lambda: self._multiplexer.try_erase(subscription, subscriber) ) - async def _on_subscriber_failure( - self, subscription: SubscriptionPath, subscriber: AsyncSingleSubscriber - ): - if subscription not in self._live_subscribers: - return - current_subscriber = self._live_subscribers[subscription] - if current_subscriber is not subscriber: - return - del self._live_subscribers[subscription] - await subscriber.__aexit__(None, None, None) - + @overrides async def __aenter__(self): + await self._multiplexer.__aenter__() return self + @overrides async def __aexit__(self, exc_type, exc_value, traceback): - live_subscribers = self._live_subscribers - self._live_subscribers = {} - for topic, sub in live_subscribers.items(): - await sub.__aexit__(exc_type, exc_value, traceback) + await self._multiplexer.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py index ceb7045c..d94650f1 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_publisher_client.py @@ -1,9 +1,11 @@ from concurrent.futures import Future -from threading import Lock -from typing import Callable, Dict, Union, Mapping +from typing import Callable, Union, Mapping from google.api_core.exceptions import GoogleAPICallError +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import ( SinglePublisher, ) @@ -11,21 +13,20 @@ PublisherClientInterface, ) from google.cloud.pubsublite.types import TopicPath +from overrides import overrides PublisherFactory = Callable[[TopicPath], SinglePublisher] class MultiplexedPublisherClient(PublisherClientInterface): _publisher_factory: PublisherFactory - - _lock: Lock - _live_publishers: Dict[TopicPath, SinglePublisher] + _multiplexer: ClientMultiplexer[TopicPath, SinglePublisher] def __init__(self, publisher_factory: PublisherFactory): self._publisher_factory = publisher_factory - self._lock = Lock() - self._live_publishers = {} + self._multiplexer = ClientMultiplexer() + @overrides def publish( self, topic: Union[TopicPath, str], @@ -35,13 +36,9 @@ def publish( ) -> "Future[str]": if isinstance(topic, str): topic = TopicPath.parse(topic) - publisher: SinglePublisher - with self._lock: - if topic not in self._live_publishers: - publisher = self._publisher_factory(topic) - publisher.__enter__() - self._live_publishers[topic] = publisher - publisher = self._live_publishers[topic] + publisher = self._multiplexer.get_or_create( + topic, lambda: self._publisher_factory(topic).__enter__() + ) future = publisher.publish(data=data, ordering_key=ordering_key, **attrs) future.add_done_callback( lambda fut: self._on_future_completion(topic, publisher, fut) @@ -54,22 +51,13 @@ def _on_future_completion( try: future.result() except GoogleAPICallError: - with self._lock: - if topic not in self._live_publishers: - return - current_publisher = self._live_publishers[topic] - if current_publisher is not publisher: - return - del self._live_publishers[topic] - publisher.__exit__(None, None, None) + self._multiplexer.try_erase(topic, publisher) + @overrides def __enter__(self): + self._multiplexer.__enter__() return self + @overrides def __exit__(self, exc_type, exc_value, traceback): - live_publishers: Dict[TopicPath, SinglePublisher] - with self._lock: - live_publishers = self._live_publishers - self._live_publishers = {} - for topic, pub in live_publishers.items(): - pub.__exit__(exc_type, exc_value, traceback) + self._multiplexer.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py index 081fb683..2965ecbd 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/multiplexed_subscriber_client.py @@ -1,10 +1,11 @@ from concurrent.futures.thread import ThreadPoolExecutor -from threading import Lock -from typing import Union, Dict +from typing import Union, Optional, Set -from google.api_core.exceptions import FailedPrecondition from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( AsyncSubscriberFactory, ) @@ -13,73 +14,68 @@ SubscriberClientInterface, MessageCallback, ) -from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) +from overrides import overrides class MultiplexedSubscriberClient(SubscriberClientInterface): _executor: ThreadPoolExecutor _underlying_factory: AsyncSubscriberFactory - _lock: Lock - _live_subscribers: Dict[SubscriptionPath, StreamingPullFuture] + _multiplexer: ClientMultiplexer[SubscriptionPath, StreamingPullFuture] def __init__( self, executor: ThreadPoolExecutor, underlying_factory: AsyncSubscriberFactory ): self._executor = executor self._underlying_factory = underlying_factory - self._lock = Lock() - self._live_subscribers = {} + def cancel_streaming_pull_future(fut: StreamingPullFuture): + try: + fut.cancel() + fut.result() + except: # noqa: E722 + pass + + self._multiplexer = ClientMultiplexer(cancel_streaming_pull_future) + + @overrides def subscribe( self, subscription: Union[SubscriptionPath, str], callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: if isinstance(subscription, str): subscription = SubscriptionPath.parse(subscription) - future: StreamingPullFuture - with self._lock: - if subscription in self._live_subscribers: - raise FailedPrecondition( - f"Cannot subscribe to the same subscription twice. {subscription}" - ) + + def create_and_open(): underlying = self._underlying_factory( - subscription, per_partition_flow_control_settings + subscription, fixed_partitions, per_partition_flow_control_settings ) subscriber = SubscriberImpl(underlying, callback, self._executor) future = StreamingPullFuture(subscriber) subscriber.__enter__() - self._live_subscribers[subscription] = future - future.add_done_callback(lambda fut: self._on_future_failure(subscription, fut)) - return future + return future - def _on_future_failure( - self, subscription: SubscriptionPath, future: StreamingPullFuture - ): - with self._lock: - if subscription not in self._live_subscribers: - return - current_future = self._live_subscribers[subscription] - if current_future is not future: - return - del self._live_subscribers[subscription] + future = self._multiplexer.create_or_fail(subscription, create_and_open) + future.add_done_callback( + lambda fut: self._multiplexer.try_erase(subscription, future) + ) + return future + @overrides def __enter__(self): self._executor.__enter__() + self._multiplexer.__enter__() return self + @overrides def __exit__(self, exc_type, exc_value, traceback): - futures: Dict[SubscriptionPath, StreamingPullFuture] - with self._lock: - futures = self._live_subscribers - self._live_subscribers = {} - for sub, future in futures.items(): - future.cancel() - try: - future.result() - except: # noqa: E722 - pass + self._multiplexer.__exit__(exc_type, exc_value, traceback) self._executor.__exit__(exc_type, exc_value, traceback) - return super().__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py index 9436252d..4ec31b2a 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py @@ -1,9 +1,13 @@ from abc import abstractmethod -from typing import AsyncContextManager, Callable +from typing import AsyncContextManager, Callable, Set, Optional from google.cloud.pubsub_v1.subscriber.message import Message -from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) class AsyncSingleSubscriber(AsyncContextManager): @@ -31,5 +35,6 @@ async def read(self) -> Message: AsyncSubscriberFactory = Callable[ - [SubscriptionPath, FlowControlSettings], AsyncSingleSubscriber + [SubscriptionPath, Optional[Set[Partition]], FlowControlSettings], + AsyncSingleSubscriber, ] diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py index b1a6d419..4d81443e 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -4,7 +4,6 @@ from google.api_core.client_options import ClientOptions from google.auth.credentials import Credentials from google.cloud.pubsub_v1.types import BatchSettings -from google.oauth2 import service_account from google.cloud.pubsublite.cloudpubsub.internal.make_publisher import ( make_publisher, @@ -20,50 +19,57 @@ PublisherClientInterface, AsyncPublisherClientInterface, ) +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) from google.cloud.pubsublite.internal.wire.make_publisher import ( DEFAULT_BATCHING_SETTINGS as WIRE_DEFAULT_BATCHING, ) from google.cloud.pubsublite.types import TopicPath +from overrides import overrides + + +class PublisherClient(PublisherClientInterface, ConstructableFromServiceAccount): + """ + A PublisherClient publishes messages similar to Google Pub/Sub. + Any publish failures are unlikely to succeed if retried. + Must be used in a `with` block or have __enter__() called before use. + """ -class PublisherClient(PublisherClientInterface): _impl: PublisherClientInterface DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + """ + The default batching settings for a publisher client. + """ def __init__( self, - per_partition_batching_setttings: Optional[BatchSettings] = None, + per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", client_options: Optional[ClientOptions] = None, ): + """ + Create a new PublisherClient. + + Args: + per_partition_batching_settings: The settings for publish batching. Apply on a per-partition basis. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ self._impl = MultiplexedPublisherClient( lambda topic: make_publisher( topic=topic, - per_partition_batching_settings=per_partition_batching_setttings, + per_partition_batching_settings=per_partition_batching_settings, credentials=credentials, client_options=client_options, transport=transport, ) ) - @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates an instance of this client using the provided credentials - file. - Args: - filename (str): The path to the service account private key json - file. - kwargs: Additional arguments to pass to the constructor. - Returns: - A PublisherClient. - """ - credentials = service_account.Credentials.from_service_account_file(filename) - return cls(credentials=credentials, **kwargs) - - from_service_account_json = from_service_account_file - def publish( self, topic: Union[TopicPath, str], @@ -83,44 +89,50 @@ def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) -class AsyncPublisherClient(AsyncPublisherClientInterface): +class AsyncPublisherClient( + AsyncPublisherClientInterface, ConstructableFromServiceAccount +): + """ + An AsyncPublisherClient publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + _impl: AsyncPublisherClientInterface DEFAULT_BATCHING_SETTINGS = WIRE_DEFAULT_BATCHING + """ + The default batching settings for a publisher client. + """ def __init__( self, - per_partition_batching_setttings: Optional[BatchSettings] = None, + per_partition_batching_settings: Optional[BatchSettings] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", client_options: Optional[ClientOptions] = None, ): + """ + Create a new AsyncPublisherClient. + + Args: + per_partition_batching_settings: The settings for publish batching. Apply on a per-partition basis. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ self._impl = MultiplexedAsyncPublisherClient( lambda topic: make_async_publisher( topic=topic, - per_partition_batching_settings=per_partition_batching_setttings, + per_partition_batching_settings=per_partition_batching_settings, credentials=credentials, client_options=client_options, transport=transport, ) ) - @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates an instance of this client using the provided credentials - file. - Args: - filename (str): The path to the service account private key json - file. - kwargs: Additional arguments to pass to the constructor. - Returns: - A PublisherClient. - """ - credentials = service_account.Credentials.from_service_account_file(filename) - return cls(credentials=credentials, **kwargs) - - from_service_account_json = from_service_account_file - + @overrides async def publish( self, topic: Union[TopicPath, str], @@ -132,9 +144,11 @@ async def publish( topic=topic, data=data, ordering_key=ordering_key, **attrs ) + @overrides async def __aenter__(self): await self._impl.__aenter__() return self + @overrides async def __aexit__(self, exc_type, exc_value, traceback): await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py index c4395359..c024585f 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -7,11 +7,11 @@ class AsyncPublisherClientInterface(AsyncContextManager): """ - An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an - async context. Any publish failures are permanent. + An AsyncPublisherClientInterface publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are unlikely to succeed if retried. - Must be used in an `async with` block or have __aenter__() awaited before use. - """ + Must be used in an `async with` block or have __aenter__() awaited before use. + """ @abstractmethod async def publish( @@ -19,7 +19,7 @@ async def publish( topic: Union[TopicPath, str], data: bytes, ordering_key: str = "", - **attrs: Mapping[str, str] + **attrs: Mapping[str, str], ) -> str: """ Publish a message. @@ -40,10 +40,11 @@ async def publish( class PublisherClientInterface(ContextManager): """ - A PublisherClientInterface publishes messages similar to Google Pub/Sub. + A PublisherClientInterface publishes messages similar to Google Pub/Sub. + Any publish failures are unlikely to succeed if retried. - Must be used in a `with` block or have __enter__() called before use. - """ + Must be used in a `with` block or have __enter__() called before use. + """ @abstractmethod def publish( @@ -51,7 +52,7 @@ def publish( topic: Union[TopicPath, str], data: bytes, ordering_key: str = "", - **attrs: Mapping[str, str] + **attrs: Mapping[str, str], ) -> "Future[str]": """ Publish a message. diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index 0b2298e7..9fa5f1a1 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -5,7 +5,6 @@ from google.auth.credentials import Credentials from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture from google.cloud.pubsub_v1.subscriber.message import Message -from google.oauth2 import service_account from google.cloud.pubsublite.cloudpubsub.internal.make_subscriber import ( make_async_subscriber, @@ -23,14 +22,25 @@ MessageCallback, AsyncSubscriberClientInterface, ) +from google.cloud.pubsublite.internal.constructable_from_service_account import ( + ConstructableFromServiceAccount, +) from google.cloud.pubsublite.types import ( FlowControlSettings, Partition, SubscriptionPath, ) +from overrides import overrides + +class SubscriberClient(SubscriberClientInterface, ConstructableFromServiceAccount): + """ + A SubscriberClient reads messages similar to Google Pub/Sub. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in a `with` block or have __enter__() called before use. + """ -class SubscriberClient(SubscriberClientInterface): _impl: SubscriberClientInterface def __init__( @@ -38,51 +48,49 @@ def __init__( executor: Optional[ThreadPoolExecutor] = None, nack_handler: Optional[NackHandler] = None, message_transformer: Optional[MessageTransformer] = None, - fixed_partitions: Optional[Set[Partition]] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", client_options: Optional[ClientOptions] = None, ): + """ + Create a new SubscriberClient. + + Args: + executor: A ThreadPoolExecutor to use. The client will shut it down on __exit__. If provided a single threaded executor, messages will be ordered per-partition, but take care that the callback does not block for too long as it will impede forward progress on all subscriptions. + nack_handler: A handler for when `nack()` is called. The default NackHandler raises an exception and fails the subscribe stream. + message_transformer: A transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ if executor is None: executor = ThreadPoolExecutor() self._impl = MultiplexedSubscriberClient( executor, - lambda subscription, settings: make_async_subscriber( + lambda subscription, partitions, settings: make_async_subscriber( subscription=subscription, transport=transport, per_partition_flow_control_settings=settings, nack_handler=nack_handler, message_transformer=message_transformer, - fixed_partitions=fixed_partitions, + fixed_partitions=partitions, credentials=credentials, client_options=client_options, ), ) - @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates an instance of this client using the provided credentials - file. - Args: - filename (str): The path to the service account private key json - file. - kwargs: Additional arguments to pass to the constructor. - Returns: - A PublisherClient. - """ - credentials = service_account.Credentials.from_service_account_file(filename) - return cls(credentials=credentials, **kwargs) - - from_service_account_json = from_service_account_file - def subscribe( self, subscription: Union[SubscriptionPath, str], callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: return self._impl.subscribe( - subscription, callback, per_partition_flow_control_settings + subscription, + callback, + per_partition_flow_control_settings, + fixed_partitions, ) def __enter__(self): @@ -93,59 +101,66 @@ def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) -class AsyncSubscriberClient(AsyncSubscriberClientInterface): +class AsyncSubscriberClient( + AsyncSubscriberClientInterface, ConstructableFromServiceAccount +): + """ + An AsyncSubscriberClient reads messages similar to Google Pub/Sub, but must be used in an + async context. + Any subscribe failures are unlikely to succeed if retried. + + Must be used in an `async with` block or have __aenter__() awaited before use. + """ + _impl: AsyncSubscriberClientInterface def __init__( self, nack_handler: Optional[NackHandler] = None, message_transformer: Optional[MessageTransformer] = None, - fixed_partitions: Optional[Set[Partition]] = None, credentials: Optional[Credentials] = None, transport: str = "grpc_asyncio", client_options: Optional[ClientOptions] = None, ): + """ + Create a new AsyncSubscriberClient. + + Args: + nack_handler: A handler for when `nack()` is called. The default NackHandler raises an exception and fails the subscribe stream. + message_transformer: A transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. + credentials: If provided, the credentials to use when connecting. + transport: The transport to use. Must correspond to an asyncio transport. + client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`. + """ self._impl = MultiplexedAsyncSubscriberClient( - lambda subscription, settings: make_async_subscriber( + lambda subscription, partitions, settings: make_async_subscriber( subscription=subscription, transport=transport, per_partition_flow_control_settings=settings, nack_handler=nack_handler, message_transformer=message_transformer, - fixed_partitions=fixed_partitions, + fixed_partitions=partitions, credentials=credentials, client_options=client_options, ) ) - @classmethod - def from_service_account_file(cls, filename, **kwargs): - """Creates an instance of this client using the provided credentials - file. - Args: - filename (str): The path to the service account private key json - file. - kwargs: Additional arguments to pass to the constructor. - Returns: - A PublisherClient. - """ - credentials = service_account.Credentials.from_service_account_file(filename) - return cls(credentials=credentials, **kwargs) - - from_service_account_json = from_service_account_file - + @overrides async def subscribe( self, subscription: Union[SubscriptionPath, str], per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> AsyncIterator[Message]: return await self._impl.subscribe( - subscription, per_partition_flow_control_settings + subscription, per_partition_flow_control_settings, fixed_partitions ) + @overrides async def __aenter__(self): await self._impl.__aenter__() return self + @overrides async def __aexit__(self, exc_type, exc_value, traceback): await self._impl.__aexit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py index 89a46e24..90160488 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -1,16 +1,29 @@ from abc import abstractmethod -from typing import ContextManager, Union, AsyncContextManager, AsyncIterator, Callable +from typing import ( + ContextManager, + Union, + AsyncContextManager, + AsyncIterator, + Callable, + Optional, + Set, +) from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture from google.cloud.pubsub_v1.subscriber.message import Message -from google.cloud.pubsublite.types import SubscriptionPath, FlowControlSettings +from google.cloud.pubsublite.types import ( + SubscriptionPath, + FlowControlSettings, + Partition, +) class AsyncSubscriberClientInterface(AsyncContextManager): """ - An AsyncPublisherClientInterface reads messages similar to Google Pub/Sub, but must be used in an + An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an async context. + Any subscribe failures are unlikely to succeed if retried. Must be used in an `async with` block or have __aenter__() awaited before use. """ @@ -20,6 +33,7 @@ async def subscribe( self, subscription: Union[SubscriptionPath, str], per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> AsyncIterator[Message]: """ Read messages from a subscription. @@ -28,6 +42,7 @@ async def subscribe( subscription: The subscription to subscribe to. per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. Returns: An AsyncIterator with Messages that must have ack() called on each exactly once. @@ -43,6 +58,7 @@ async def subscribe( class SubscriberClientInterface(ContextManager): """ A SubscriberClientInterface reads messages similar to Google Pub/Sub. + Any subscribe failures are unlikely to succeed if retried. Must be used in a `with` block or have __enter__() called before use. """ @@ -53,6 +69,7 @@ def subscribe( subscription: Union[SubscriptionPath, str], callback: MessageCallback, per_partition_flow_control_settings: FlowControlSettings, + fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: """ This method starts a background thread to begin pulling messages from @@ -64,6 +81,7 @@ def subscribe( callback: The callback function. This function receives the message as its only argument. per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. Returns: A StreamingPullFuture instance that can be used to manage the background stream. diff --git a/google/cloud/pubsublite/internal/constructable_from_service_account.py b/google/cloud/pubsublite/internal/constructable_from_service_account.py new file mode 100644 index 00000000..1261c5fc --- /dev/null +++ b/google/cloud/pubsublite/internal/constructable_from_service_account.py @@ -0,0 +1,18 @@ +from google.oauth2 import service_account + + +class ConstructableFromServiceAccount: + @classmethod + def from_service_account_file(cls, filename, **kwargs): + f"""Creates an instance of this client using the provided credentials file. + Args: + filename (str): The path to the service account private key json + file. + kwargs: Additional arguments to pass to the constructor. + Returns: + A {cls.__name__}. + """ + credentials = service_account.Credentials.from_service_account_file(filename) + return cls(credentials=credentials, **kwargs) + + from_service_account_json = from_service_account_file diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index 5fd5b56f..030a76a0 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -2,7 +2,7 @@ from google.protobuf.field_mask_pb2 import FieldMask -from google.cloud.pubsublite.admin_client import AdminClient +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface from google.cloud.pubsublite.types import ( CloudRegion, SubscriptionPath, @@ -17,7 +17,7 @@ ) -class AdminClientImpl(AdminClient): +class AdminClientImpl(AdminClientInterface): _underlying: AdminServiceClient _region: CloudRegion diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index 9705533c..93abaad3 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -2,7 +2,7 @@ from google.cloud.pubsub_v1.types import BatchSettings -from google.cloud.pubsublite.make_admin_client import make_admin_client +from google.cloud.pubsublite.admin_client import AdminClient from google.cloud.pubsublite.internal.endpoints import regional_endpoint from google.cloud.pubsublite.internal.wire.default_routing_policy import ( DefaultRoutingPolicy, @@ -60,7 +60,7 @@ def make_publisher( """ if per_partition_batching_settings is None: per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS - admin_client = make_admin_client( + admin_client = AdminClient( region=topic.location.region, credentials=credentials, client_options=client_options, diff --git a/google/cloud/pubsublite/internal/wire/routing_publisher.py b/google/cloud/pubsublite/internal/wire/routing_publisher.py index 7e60d86c..2558df9c 100644 --- a/google/cloud/pubsublite/internal/wire/routing_publisher.py +++ b/google/cloud/pubsublite/internal/wire/routing_publisher.py @@ -1,3 +1,4 @@ +import asyncio from typing import Mapping from google.cloud.pubsublite.internal.wire.publisher import Publisher @@ -17,8 +18,10 @@ def __init__( self._publishers = publishers async def __aenter__(self): - for publisher in self._publishers.values(): - await publisher.__aenter__() + enter_futures = [ + publisher.__aenter__() for publisher in self._publishers.values() + ] + await asyncio.gather(*enter_futures) return self async def __aexit__(self, exc_type, exc_val, exc_tb): diff --git a/google/cloud/pubsublite/make_admin_client.py b/google/cloud/pubsublite/make_admin_client.py deleted file mode 100644 index 5e4779e4..00000000 --- a/google/cloud/pubsublite/make_admin_client.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Optional - -from google.api_core.client_options import ClientOptions - -from google.cloud.pubsublite.admin_client import AdminClient -from google.cloud.pubsublite.internal.endpoints import regional_endpoint -from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl -from google.cloud.pubsublite.types import CloudRegion -from google.cloud.pubsublite_v1 import AdminServiceClient -from google.auth.credentials import Credentials - - -def make_admin_client( - region: CloudRegion, - credentials: Optional[Credentials] = None, - client_options: Optional[ClientOptions] = None, -) -> AdminClient: - if client_options is None: - client_options = ClientOptions(api_endpoint=regional_endpoint(region)) - return AdminClientImpl( - AdminServiceClient(client_options=client_options, credentials=credentials), - region, - ) diff --git a/setup.py b/setup.py index 9261c547..27d2911b 100644 --- a/setup.py +++ b/setup.py @@ -34,6 +34,7 @@ "google-cloud-pubsub >= 2.1.0", "grpcio", "setuptools", + "overrides", ] setuptools.setup( diff --git a/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py b/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py new file mode 100644 index 00000000..feefea1e --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/async_client_multiplexer_test.py @@ -0,0 +1,69 @@ +import pytest + +from asynctest.mock import call, CoroutineMock + +from google.api_core.exceptions import FailedPrecondition + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + AsyncClientMultiplexer, +) + +# All test coroutines will be treated as marked. +pytestmark = pytest.mark.asyncio + + +class Client: + pass + + +@pytest.fixture() +def client_factory(): + return CoroutineMock() + + +@pytest.fixture() +def client_closer(): + return CoroutineMock() + + +@pytest.fixture() +def multiplexer(client_closer): + return AsyncClientMultiplexer(client_closer) + + +async def test_create( + multiplexer: AsyncClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + async with multiplexer: + client_factory.return_value = client1 + assert await multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + assert await multiplexer.get_or_create(1, client_factory) is client1 + with pytest.raises(FailedPrecondition): + await multiplexer.create_or_fail(1, client_factory) + assert await multiplexer.get_or_create(2, client_factory) is client2 + client_factory.assert_has_calls([call(), call()]) + with pytest.raises(FailedPrecondition): + await multiplexer.create_or_fail(2, client_factory) + client_closer.assert_has_calls([call(client1), call(client2)], any_order=True) + + +async def test_recreate( + multiplexer: AsyncClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + async with multiplexer: + client_factory.return_value = client1 + assert await multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + await multiplexer.try_erase(1, client2) + client_closer.assert_has_calls([]) + await multiplexer.try_erase(1, client1) + client_closer.assert_has_calls([call(client1)]) + assert await multiplexer.create_or_fail(1, client_factory) is client2 + client_closer.assert_has_calls([call(client1), call(client2)]) diff --git a/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py b/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py new file mode 100644 index 00000000..21e26aa1 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/client_multiplexer_test.py @@ -0,0 +1,67 @@ +import pytest + +from mock import MagicMock, call + +# All test coroutines will be treated as marked. +from google.api_core.exceptions import FailedPrecondition + +from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( + ClientMultiplexer, +) + + +class Client: + pass + + +@pytest.fixture() +def client_factory(): + return MagicMock() + + +@pytest.fixture() +def client_closer(): + return MagicMock() + + +@pytest.fixture() +def multiplexer(client_closer): + return ClientMultiplexer(client_closer) + + +def test_create( + multiplexer: ClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + with multiplexer: + client_factory.return_value = client1 + assert multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + assert multiplexer.get_or_create(1, client_factory) is client1 + with pytest.raises(FailedPrecondition): + multiplexer.create_or_fail(1, client_factory) + assert multiplexer.get_or_create(2, client_factory) is client2 + client_factory.assert_has_calls([call(), call()]) + with pytest.raises(FailedPrecondition): + multiplexer.create_or_fail(2, client_factory) + client_closer.assert_has_calls([call(client1), call(client2)], any_order=True) + + +def test_recreate( + multiplexer: ClientMultiplexer[int, Client], client_closer, client_factory +): + client1 = Client() + client2 = Client() + with multiplexer: + client_factory.return_value = client1 + assert multiplexer.create_or_fail(1, client_factory) is client1 + client_factory.assert_has_calls([call()]) + client_factory.return_value = client2 + multiplexer.try_erase(1, client2) + client_closer.assert_has_calls([]) + multiplexer.try_erase(1, client1) + client_closer.assert_has_calls([call(client1)]) + assert multiplexer.create_or_fail(1, client_factory) is client2 + client_closer.assert_has_calls([call(client1), call(client2)]) diff --git a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py new file mode 100644 index 00000000..13d16649 --- /dev/null +++ b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py @@ -0,0 +1,76 @@ +import asyncio + +import pytest + +from asynctest.mock import call, CoroutineMock, MagicMock + +from google.api_core.exceptions import FailedPrecondition +from google.cloud.pubsub_v1.subscriber.message import Message +from google.pubsub_v1 import PubsubMessage + +from google.cloud.pubsublite.cloudpubsub import AsyncSubscriberClientInterface + +# All test coroutines will be treated as marked. +from google.cloud.pubsublite.cloudpubsub.internal.multiplexed_async_subscriber_client import ( + MultiplexedAsyncSubscriberClient, +) +from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( + AsyncSubscriberFactory, + AsyncSingleSubscriber, +) +from google.cloud.pubsublite.testing.test_utils import wire_queues +from google.cloud.pubsublite.types import ( + SubscriptionPath, + CloudZone, + DISABLED_FLOW_CONTROL, +) + +pytestmark = pytest.mark.asyncio + + +@pytest.fixture() +def default_subscriber(): + return MagicMock(spec=AsyncSingleSubscriber) + + +@pytest.fixture() +def subscriber_factory(default_subscriber): + factory = MagicMock(spec=AsyncSubscriberFactory) + factory.return_value = default_subscriber + return factory + + +@pytest.fixture() +def multiplexed_client(subscriber_factory: AsyncSubscriberFactory): + return MultiplexedAsyncSubscriberClient(subscriber_factory) + + +async def test_iterator( + default_subscriber, + subscriber_factory, + multiplexed_client: AsyncSubscriberClientInterface, +): + read_queues = wire_queues(default_subscriber.read) + subscription = SubscriptionPath(1, CloudZone.parse("us-central1-a"), "abc") + message = Message(PubsubMessage(message_id="1")._pb, "", 0, None) + async with multiplexed_client: + iterator = await multiplexed_client.subscribe( + subscription, DISABLED_FLOW_CONTROL + ) + subscriber_factory.assert_has_calls( + [call(subscription, None, DISABLED_FLOW_CONTROL)] + ) + read_fut_1 = asyncio.ensure_future(iterator.__anext__()) + assert not read_fut_1.done() + await read_queues.called.get() + default_subscriber.read.assert_has_calls([call()]) + await read_queues.results.put(message) + assert await read_fut_1 is message + read_fut_2 = asyncio.ensure_future(iterator.__anext__()) + assert not read_fut_2.done() + await read_queues.called.get() + default_subscriber.read.assert_has_calls([call(), call()]) + await read_queues.results.put(FailedPrecondition("")) + with pytest.raises(FailedPrecondition): + await read_fut_2 + default_subscriber.__aexit__.assert_called_once() From 35f19def6e6b049484f0b7783519583969984f6b Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 29 Oct 2020 14:06:36 -0400 Subject: [PATCH 5/8] fix: Import admin types to __init__.py and add overrides on async clients to copy docs --- google/cloud/pubsublite/__init__.py | 6 ++++++ google/cloud/pubsublite/cloudpubsub/publisher_client.py | 3 +++ google/cloud/pubsublite/cloudpubsub/subscriber_client.py | 3 +++ 3 files changed, 12 insertions(+) diff --git a/google/cloud/pubsublite/__init__.py b/google/cloud/pubsublite/__init__.py index 80bc419e..0a2808c0 100644 --- a/google/cloud/pubsublite/__init__.py +++ b/google/cloud/pubsublite/__init__.py @@ -96,8 +96,14 @@ from google.cloud.pubsublite_v1.types.subscriber import SeekResponse from google.cloud.pubsublite_v1.types.subscriber import SubscribeRequest from google.cloud.pubsublite_v1.types.subscriber import SubscribeResponse +from google.cloud.pubsublite.admin_client_interface import AdminClientInterface +from google.cloud.pubsublite.admin_client import AdminClient __all__ = ( + # Manual files + "AdminClient", + "AdminClientInterface", + # Generated files "AdminServiceAsyncClient", "AdminServiceClient", "AttributeValues", diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client.py b/google/cloud/pubsublite/cloudpubsub/publisher_client.py index 4d81443e..23c72f32 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client.py @@ -70,6 +70,7 @@ def __init__( ) ) + @overrides def publish( self, topic: Union[TopicPath, str], @@ -81,10 +82,12 @@ def publish( topic=topic, data=data, ordering_key=ordering_key, **attrs ) + @overrides def __enter__(self): self._impl.__enter__() return self + @overrides def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py index 9fa5f1a1..713ad7fb 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client.py @@ -79,6 +79,7 @@ def __init__( ), ) + @overrides def subscribe( self, subscription: Union[SubscriptionPath, str], @@ -93,10 +94,12 @@ def subscribe( fixed_partitions, ) + @overrides def __enter__(self): self._impl.__enter__() return self + @overrides def __exit__(self, exc_type, exc_value, traceback): self._impl.__exit__(exc_type, exc_value, traceback) From 18375f0f8eaa19170845d80583332e11968877cf Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Thu, 29 Oct 2020 14:12:17 -0400 Subject: [PATCH 6/8] fix: Lint issues --- .coveragerc | 4 +++- google/cloud/pubsublite/cloudpubsub/__init__.py | 1 + .../internal/multiplexed_async_subscriber_client_test.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/.coveragerc b/.coveragerc index 57cd1a52..08d01e36 100644 --- a/.coveragerc +++ b/.coveragerc @@ -30,7 +30,9 @@ exclude_lines = raise NotImplementedError @abstractmethod # Ignore delegating methods - impl_. + self._impl. + # Ignore __exit__ "return self" + return self omit = */pubsublite_v1/*.py diff --git a/google/cloud/pubsublite/cloudpubsub/__init__.py b/google/cloud/pubsublite/cloudpubsub/__init__.py index d9180447..c030009e 100644 --- a/google/cloud/pubsublite/cloudpubsub/__init__.py +++ b/google/cloud/pubsublite/cloudpubsub/__init__.py @@ -1,3 +1,4 @@ +# flake8: noqa from .message_transformer import MessageTransformer from .nack_handler import NackHandler from .publisher_client_interface import ( diff --git a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py index 13d16649..4a0ca557 100644 --- a/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py +++ b/tests/unit/pubsublite/cloudpubsub/internal/multiplexed_async_subscriber_client_test.py @@ -2,7 +2,7 @@ import pytest -from asynctest.mock import call, CoroutineMock, MagicMock +from asynctest.mock import call, MagicMock from google.api_core.exceptions import FailedPrecondition from google.cloud.pubsub_v1.subscriber.message import Message From 85113352567cb4066aec5094fd739da72c11562e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 4 Nov 2020 13:22:36 -0500 Subject: [PATCH 7/8] fix: Fix deadlock in streaming pull shutdown --- .../pubsublite/cloudpubsub/internal/subscriber_impl.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py index 46fb14db..891797df 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/subscriber_impl.py @@ -58,9 +58,10 @@ def add_close_callback(self, close_callback: CloseCallback): def close(self): with self._close_lock: - if not self._closed: - self._closed = True - self.__exit__(None, None, None) + if self._closed: + return + self._closed = True + self.__exit__(None, None, None) def _fail(self, error: GoogleAPICallError): self._failure = error From 832cf99f0c72f510c66bab78319b1c82a3ac5c57 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 4 Nov 2020 14:14:34 -0500 Subject: [PATCH 8/8] fix: Ensure client awaitables passed to await_unless_failed are awaited --- .../pubsublite/internal/wire/permanent_failable.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py index fa7fb6f2..dcaa9467 100644 --- a/google/cloud/pubsublite/internal/wire/permanent_failable.py +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -21,6 +21,14 @@ def _failure_task(self) -> asyncio.Future: self._maybe_failure_task = asyncio.Future() return self._maybe_failure_task + @staticmethod + async def _fail_client_task(task: asyncio.Future): + task.cancel() + try: + await task + except: # noqa: E722 intentionally broad except clause + pass + async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: """ Await the awaitable, unless fail() is called first. @@ -30,15 +38,17 @@ async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: Returns: The result of the awaitable Raises: The permanent error if fail() is called or the awaitable raises one. """ + + task = asyncio.ensure_future(awaitable) if self._failure_task.done(): + await self._fail_client_task(task) raise self._failure_task.exception() - task = asyncio.ensure_future(awaitable) done, _ = await asyncio.wait( [task, self._failure_task], return_when=asyncio.FIRST_COMPLETED ) if task in done: return await task - task.cancel() + await self._fail_client_task(task) raise self._failure_task.exception() async def run_poller(self, poll_action: Callable[[], Awaitable[None]]):