From ad0f3d298ec9979ff558c2bb7fc73b53638db2ac Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Fri, 11 Jun 2021 15:37:48 -0400 Subject: [PATCH] fix: Add admin interfaces for reservations (#159) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * fix: Add admin interfaces for reservations Also run blacken (reformatter) Also remove default python interpreter for any sessions where we don't care what interpreter is used, allowing `nox -s blacken` and others to work if you're running under 3.7, 3.9, etc. * 🦉 Updates from OwlBot * fix: add warning Co-authored-by: Owl Bot --- google/cloud/pubsublite/admin_client.py | 38 +++++++++- .../pubsublite/admin_client_interface.py | 51 ++++++++++++- .../cloudpubsub/internal/ack_set_tracker.py | 38 +++++----- .../cloudpubsub/internal/make_publisher.py | 64 ++++++++-------- .../cloudpubsub/internal/make_subscriber.py | 34 ++++----- .../cloudpubsub/internal/single_publisher.py | 54 +++++++------- .../cloudpubsub/internal/single_subscriber.py | 22 +++--- .../cloudpubsub/message_transformer.py | 14 ++-- .../pubsublite/cloudpubsub/nack_handler.py | 16 ++-- .../cloudpubsub/publisher_client_interface.py | 46 ++++++------ .../subscriber_client_interface.py | 74 +++++++++---------- .../internal/wire/admin_client_impl.py | 41 +++++++++- .../pubsublite/internal/wire/assigner.py | 6 +- .../pubsublite/internal/wire/committer.py | 4 +- .../internal/wire/default_routing_policy.py | 6 +- .../internal/wire/make_publisher.py | 26 +++---- .../internal/wire/merge_metadata.py | 4 +- .../internal/wire/permanent_failable.py | 22 +++--- .../pubsublite/internal/wire/publisher.py | 20 ++--- .../internal/wire/retrying_connection.py | 4 +- .../internal/wire/routing_policy.py | 10 +-- .../internal/wire/serial_batcher.py | 20 ++--- .../pubsublite/internal/wire/subscriber.py | 20 ++--- google/cloud/pubsublite/testing/test_utils.py | 6 +- google/cloud/pubsublite/types/location.py | 7 ++ google/cloud/pubsublite/types/paths.py | 31 +++++++- 26 files changed, 416 insertions(+), 262 deletions(-) diff --git a/google/cloud/pubsublite/admin_client.py b/google/cloud/pubsublite/admin_client.py index 7d7a8dd5..bc7622c8 100644 --- a/google/cloud/pubsublite/admin_client.py +++ b/google/cloud/pubsublite/admin_client.py @@ -32,7 +32,13 @@ TopicPath, BacklogLocation, ) -from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic +from google.cloud.pubsublite.types.paths import ReservationPath +from google.cloud.pubsublite_v1 import ( + AdminServiceClient, + Subscription, + Topic, + Reservation, +) class AdminClient(AdminClientInterface, ConstructableFromServiceAccount): @@ -98,7 +104,7 @@ def delete_topic(self, topic_path: TopicPath): return self._impl.delete_topic(topic_path) @overrides - def list_topic_subscriptions(self, topic_path: TopicPath): + def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]: return self._impl.list_topic_subscriptions(topic_path) @overrides @@ -126,3 +132,31 @@ def update_subscription( @overrides def delete_subscription(self, subscription_path: SubscriptionPath): return self._impl.delete_subscription(subscription_path) + + @overrides + def create_reservation(self, reservation: Reservation) -> Reservation: + return self._impl.create_reservation(reservation) + + @overrides + def get_reservation(self, reservation_path: ReservationPath) -> Reservation: + return self._impl.get_reservation(reservation_path) + + @overrides + def list_reservations(self, location_path: LocationPath) -> List[Reservation]: + return self._impl.list_reservations(location_path) + + @overrides + def update_reservation( + self, reservation: Reservation, update_mask: FieldMask + ) -> Reservation: + return self._impl.update_reservation(reservation, update_mask) + + @overrides + def delete_reservation(self, reservation_path: ReservationPath): + return self._impl.delete_reservation(reservation_path) + + @overrides + def list_reservation_topics( + self, reservation_path: ReservationPath + ) -> List[TopicPath]: + return self._impl.list_reservation_topics(reservation_path) diff --git a/google/cloud/pubsublite/admin_client_interface.py b/google/cloud/pubsublite/admin_client_interface.py index 38b90ba3..c300351d 100644 --- a/google/cloud/pubsublite/admin_client_interface.py +++ b/google/cloud/pubsublite/admin_client_interface.py @@ -22,7 +22,8 @@ SubscriptionPath, BacklogLocation, ) -from google.cloud.pubsublite_v1 import Topic, Subscription +from google.cloud.pubsublite.types.paths import ReservationPath +from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation from google.protobuf.field_mask_pb2 import FieldMask @@ -60,7 +61,7 @@ def delete_topic(self, topic_path: TopicPath): """Delete a topic and all associated messages.""" @abstractmethod - def list_topic_subscriptions(self, topic_path: TopicPath): + def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]: """List the subscriptions that exist for a given topic.""" @abstractmethod @@ -90,3 +91,49 @@ def update_subscription( @abstractmethod def delete_subscription(self, subscription_path: SubscriptionPath): """Delete a subscription and all associated messages.""" + + @abstractmethod + def create_reservation(self, reservation: Reservation) -> Reservation: + """Create a reservation, returns the created reservation. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ + + @abstractmethod + def get_reservation(self, reservation_path: ReservationPath) -> Reservation: + """Get the reservation object from the server. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ + + @abstractmethod + def list_reservations(self, location_path: LocationPath) -> List[Reservation]: + """List the Pub/Sub lite reservations that exist for a project in a given location. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ + + @abstractmethod + def update_reservation( + self, reservation: Reservation, update_mask: FieldMask + ) -> Reservation: + """Update the masked fields of the provided reservation. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ + + @abstractmethod + def delete_reservation(self, reservation_path: ReservationPath): + """Delete a reservation and all associated messages. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ + + @abstractmethod + def list_reservation_topics( + self, reservation_path: ReservationPath + ) -> List[TopicPath]: + """List the subscriptions that exist for a given reservation. + + warning:: This may not be implemented in the backend, it is a pre-release feature. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py index c1f73b7c..67268deb 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py @@ -18,39 +18,39 @@ class AckSetTracker(AsyncContextManager): """ - An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets - is aggregated. - """ + An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets + is aggregated. + """ @abstractmethod def track(self, offset: int): """ - Track the provided offset. + Track the provided offset. - Args: - offset: the offset to track. + Args: + offset: the offset to track. - Raises: - GoogleAPICallError: On an invalid offset to track. - """ + Raises: + GoogleAPICallError: On an invalid offset to track. + """ @abstractmethod async def ack(self, offset: int): """ - Acknowledge the message with the provided offset. The offset must have previously been tracked. + Acknowledge the message with the provided offset. The offset must have previously been tracked. - Args: - offset: the offset to acknowledge. + Args: + offset: the offset to acknowledge. - Returns: - GoogleAPICallError: On a commit failure. - """ + Returns: + GoogleAPICallError: On a commit failure. + """ @abstractmethod async def clear_and_commit(self): """ - Discard all outstanding acks and wait for the commit offset to be acknowledged by the server. + Discard all outstanding acks and wait for the commit offset to be acknowledged by the server. - Raises: - GoogleAPICallError: If the committer has shut down due to a permanent error. - """ + Raises: + GoogleAPICallError: If the committer has shut down due to a permanent error. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py index c54d086e..6dbc920c 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py @@ -49,22 +49,22 @@ def make_async_publisher( metadata: Optional[Mapping[str, str]] = None, ) -> AsyncSinglePublisher: """ - Make a new publisher for the given topic. - - Args: - topic: The topic to publish to. - transport: The transport type to use. - per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. - - Returns: - A new AsyncPublisher. - - Throws: - GoogleApiCallException on any error determining topic structure. - """ + Make a new publisher for the given topic. + + Args: + topic: The topic to publish to. + transport: The transport type to use. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. + credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. + client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. + metadata: Additional metadata to send with the RPC. + + Returns: + A new AsyncPublisher. + + Throws: + GoogleApiCallException on any error determining topic structure. + """ metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata) def underlying_factory(): @@ -89,22 +89,22 @@ def make_publisher( metadata: Optional[Mapping[str, str]] = None, ) -> SinglePublisher: """ - Make a new publisher for the given topic. - - Args: - topic: The topic to publish to. - transport: The transport type to use. - per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. - - Returns: - A new Publisher. - - Throws: - GoogleApiCallException on any error determining topic structure. - """ + Make a new publisher for the given topic. + + Args: + topic: The topic to publish to. + transport: The transport type to use. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. + credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. + client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. + metadata: Additional metadata to send with the RPC. + + Returns: + A new Publisher. + + Throws: + GoogleApiCallException on any error determining topic structure. + """ return SinglePublisherImpl( make_async_publisher( topic=topic, diff --git a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py index e51a5e64..c7926e13 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py @@ -175,23 +175,23 @@ def make_async_subscriber( metadata: Optional[Mapping[str, str]] = None, ) -> AsyncSingleSubscriber: """ - Make a Pub/Sub Lite AsyncSubscriber. - - Args: - subscription: The subscription to subscribe to. - transport: The transport type to use. - per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these - settings apply to each partition individually, not in aggregate. - nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. - message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. - fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. - - Returns: - A new AsyncSubscriber. - """ + Make a Pub/Sub Lite AsyncSubscriber. + + Args: + subscription: The subscription to subscribe to. + transport: The transport type to use. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client. + message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. + credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. + client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. + metadata: Additional metadata to send with the RPC. + + Returns: + A new AsyncSubscriber. + """ metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata) if client_options is None: client_options = ClientOptions( diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py index 04f6e0f6..25bd15b5 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py @@ -19,54 +19,54 @@ class AsyncSinglePublisher(AsyncContextManager): """ - An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an - async context. Any publish failures are permanent. + An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an + async context. Any publish failures are permanent. - Must be used in an `async with` block or have __aenter__() awaited before use. - """ + Must be used in an `async with` block or have __aenter__() awaited before use. + """ @abstractmethod async def publish( self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str] ) -> str: """ - Publish a message. + Publish a message. - Args: - data: The bytestring payload of the message - ordering_key: The key to enforce ordering on, or "" for no ordering. - **attrs: Additional attributes to send. + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. - Returns: - An ack id, which can be decoded using MessageMetadata.decode. + Returns: + An ack id, which can be decoded using MessageMetadata.decode. - Raises: - GoogleApiCallError: On a permanent failure. - """ + Raises: + GoogleApiCallError: On a permanent failure. + """ class SinglePublisher(ContextManager): """ - A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. + A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent. - Must be used in a `with` block or have __enter__() called before use. - """ + Must be used in a `with` block or have __enter__() called before use. + """ @abstractmethod def publish( self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str] ) -> "futures.Future[str]": """ - Publish a message. + Publish a message. - Args: - data: The bytestring payload of the message - ordering_key: The key to enforce ordering on, or "" for no ordering. - **attrs: Additional attributes to send. + Args: + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. - Returns: - A future completed with an ack id, which can be decoded using MessageMetadata.decode. + Returns: + A future completed with an ack id, which can be decoded using MessageMetadata.decode. - Raises: - GoogleApiCallError: On a permanent failure. - """ + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py index 13f0cda2..a2eddbc6 100644 --- a/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py +++ b/google/cloud/pubsublite/cloudpubsub/internal/single_subscriber.py @@ -26,25 +26,25 @@ class AsyncSingleSubscriber(AsyncContextManager): """ - A Cloud Pub/Sub asynchronous subscriber. + A Cloud Pub/Sub asynchronous subscriber. - Must be used in an `async with` block or have __aenter__() awaited before use. - """ + Must be used in an `async with` block or have __aenter__() awaited before use. + """ @abstractmethod async def read(self) -> Message: """ - Read the next message off of the stream. + Read the next message off of the stream. - Returns: - The next message. ack() or nack() must eventually be called exactly once. + Returns: + The next message. ack() or nack() must eventually be called exactly once. - Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client - unless you have a NackHandler installed. + Pub/Sub Lite does not support nack() by default- if you do call nack(), it will immediately fail the client + unless you have a NackHandler installed. - Raises: - GoogleAPICallError: On a permanent error. - """ + Raises: + GoogleAPICallError: On a permanent error. + """ raise NotImplementedError() diff --git a/google/cloud/pubsublite/cloudpubsub/message_transformer.py b/google/cloud/pubsublite/cloudpubsub/message_transformer.py index d0b75bd5..50a44b1b 100644 --- a/google/cloud/pubsublite/cloudpubsub/message_transformer.py +++ b/google/cloud/pubsublite/cloudpubsub/message_transformer.py @@ -23,19 +23,19 @@ class MessageTransformer(ABC): """ - A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos. - """ + A MessageTransformer turns Pub/Sub Lite message protos into Pub/Sub message protos. + """ @abstractmethod def transform(self, source: SequencedMessage) -> PubsubMessage: """Transform a SequencedMessage to a PubsubMessage. - Args: - source: The message to transform. + Args: + source: The message to transform. - Raises: - GoogleAPICallError: To fail the client if raised inline. - """ + Raises: + GoogleAPICallError: To fail the client if raised inline. + """ pass @staticmethod diff --git a/google/cloud/pubsublite/cloudpubsub/nack_handler.py b/google/cloud/pubsublite/cloudpubsub/nack_handler.py index 978d6092..a79dca10 100644 --- a/google/cloud/pubsublite/cloudpubsub/nack_handler.py +++ b/google/cloud/pubsublite/cloudpubsub/nack_handler.py @@ -21,20 +21,20 @@ class NackHandler(ABC): """ - A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite. - """ + A NackHandler handles calls to the nack() method which is not expressible in Pub/Sub Lite. + """ @abstractmethod def on_nack(self, message: PubsubMessage, ack: Callable[[], None]): """Handle a negative acknowledgement. ack must eventually be called. - Args: - message: The nacked message. - ack: A callable to acknowledge the underlying message. This must eventually be called. + Args: + message: The nacked message. + ack: A callable to acknowledge the underlying message. This must eventually be called. - Raises: - GoogleAPICallError: To fail the client if raised inline. - """ + Raises: + GoogleAPICallError: To fail the client if raised inline. + """ pass diff --git a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py index 883a9518..47d6783a 100644 --- a/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/publisher_client_interface.py @@ -36,20 +36,20 @@ async def publish( **attrs: Mapping[str, str], ) -> str: """ - Publish a message. + Publish a message. - Args: - topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. - data: The bytestring payload of the message - ordering_key: The key to enforce ordering on, or "" for no ordering. - **attrs: Additional attributes to send. + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. - Returns: - An ack id, which can be decoded using MessageMetadata.decode. + Returns: + An ack id, which can be decoded using MessageMetadata.decode. - Raises: - GoogleApiCallError: On a permanent failure. - """ + Raises: + GoogleApiCallError: On a permanent failure. + """ class PublisherClientInterface(ContextManager): @@ -69,18 +69,18 @@ def publish( **attrs: Mapping[str, str], ) -> "Future[str]": """ - Publish a message. + Publish a message. - Args: - topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. - data: The bytestring payload of the message - ordering_key: The key to enforce ordering on, or "" for no ordering. - **attrs: Additional attributes to send. + Args: + topic: The topic to publish to. Publishes to new topics may have nontrivial startup latency. + data: The bytestring payload of the message + ordering_key: The key to enforce ordering on, or "" for no ordering. + **attrs: Additional attributes to send. - Returns: - A future completed with an ack id of type str, which can be decoded using - MessageMetadata.decode. + Returns: + A future completed with an ack id of type str, which can be decoded using + MessageMetadata.decode. - Raises: - GoogleApiCallError: On a permanent failure. - """ + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py index 8dd23506..d1586661 100644 --- a/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py +++ b/google/cloud/pubsublite/cloudpubsub/subscriber_client_interface.py @@ -35,12 +35,12 @@ class AsyncSubscriberClientInterface(AsyncContextManager): """ - An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an - async context. - Any subscribe failures are unlikely to succeed if retried. + An AsyncSubscriberClientInterface reads messages similar to Google Pub/Sub, but must be used in an + async context. + Any subscribe failures are unlikely to succeed if retried. - Must be used in an `async with` block or have __aenter__() awaited before use. - """ + Must be used in an `async with` block or have __aenter__() awaited before use. + """ @abstractmethod async def subscribe( @@ -50,20 +50,20 @@ async def subscribe( fixed_partitions: Optional[Set[Partition]] = None, ) -> AsyncIterator[Message]: """ - Read messages from a subscription. + Read messages from a subscription. - Args: - subscription: The subscription to subscribe to. - per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these - settings apply to each partition individually, not in aggregate. - fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. + Args: + subscription: The subscription to subscribe to. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. - Returns: - An AsyncIterator with Messages that must have ack() called on each exactly once. + Returns: + An AsyncIterator with Messages that must have ack() called on each exactly once. - Raises: - GoogleApiCallError: On a permanent failure. - """ + Raises: + GoogleApiCallError: On a permanent failure. + """ MessageCallback = Callable[[Message], None] @@ -71,11 +71,11 @@ async def subscribe( class SubscriberClientInterface(ContextManager): """ - A SubscriberClientInterface reads messages similar to Google Pub/Sub. - Any subscribe failures are unlikely to succeed if retried. + A SubscriberClientInterface reads messages similar to Google Pub/Sub. + Any subscribe failures are unlikely to succeed if retried. - Must be used in a `with` block or have __enter__() called before use. - """ + Must be used in a `with` block or have __enter__() called before use. + """ @abstractmethod def subscribe( @@ -86,20 +86,20 @@ def subscribe( fixed_partitions: Optional[Set[Partition]] = None, ) -> StreamingPullFuture: """ - This method starts a background thread to begin pulling messages from - a Pub/Sub Lite subscription and scheduling them to be processed using the - provided ``callback``. - - Args: - subscription: The subscription to subscribe to. - callback: The callback function. This function receives the message as its only argument. - per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these - settings apply to each partition individually, not in aggregate. - fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. - - Returns: - A StreamingPullFuture instance that can be used to manage the background stream. - - Raises: - GoogleApiCallError: On a permanent failure. - """ + This method starts a background thread to begin pulling messages from + a Pub/Sub Lite subscription and scheduling them to be processed using the + provided ``callback``. + + Args: + subscription: The subscription to subscribe to. + callback: The callback function. This function receives the message as its only argument. + per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these + settings apply to each partition individually, not in aggregate. + fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment. + + Returns: + A StreamingPullFuture instance that can be used to manage the background stream. + + Raises: + GoogleApiCallError: On a permanent failure. + """ diff --git a/google/cloud/pubsublite/internal/wire/admin_client_impl.py b/google/cloud/pubsublite/internal/wire/admin_client_impl.py index ab5acba6..3dc2cdcb 100644 --- a/google/cloud/pubsublite/internal/wire/admin_client_impl.py +++ b/google/cloud/pubsublite/internal/wire/admin_client_impl.py @@ -24,11 +24,13 @@ TopicPath, BacklogLocation, ) +from google.cloud.pubsublite.types.paths import ReservationPath from google.cloud.pubsublite_v1 import ( Subscription, Topic, AdminServiceClient, TopicPartitions, + Reservation, ) @@ -67,7 +69,7 @@ def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic: def delete_topic(self, topic_path: TopicPath): self._underlying.delete_topic(name=str(topic_path)) - def list_topic_subscriptions(self, topic_path: TopicPath): + def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]: subscription_strings = [ x for x in self._underlying.list_topic_subscriptions(name=str(topic_path)) ] @@ -105,3 +107,40 @@ def update_subscription( def delete_subscription(self, subscription_path: SubscriptionPath): self._underlying.delete_subscription(name=str(subscription_path)) + + def create_reservation(self, reservation: Reservation) -> Reservation: + path = ReservationPath.parse(reservation.name) + return self._underlying.create_reservation( + parent=str(path.to_location_path()), + reservation=reservation, + reservation_id=path.name, + ) + + def get_reservation(self, reservation_path: ReservationPath) -> Reservation: + return self._underlying.get_reservation(name=str(reservation_path)) + + def list_reservations(self, location_path: LocationPath) -> List[Reservation]: + return [ + x for x in self._underlying.list_reservations(parent=str(location_path)) + ] + + def update_reservation( + self, reservation: Reservation, update_mask: FieldMask + ) -> Reservation: + return self._underlying.update_reservation( + reservation=reservation, update_mask=update_mask + ) + + def delete_reservation(self, reservation_path: ReservationPath): + self._underlying.delete_reservation(name=str(reservation_path)) + + def list_reservation_topics( + self, reservation_path: ReservationPath + ) -> List[TopicPath]: + subscription_strings = [ + x + for x in self._underlying.list_reservation_topics( + name=str(reservation_path) + ) + ] + return [TopicPath.parse(x) for x in subscription_strings] diff --git a/google/cloud/pubsublite/internal/wire/assigner.py b/google/cloud/pubsublite/internal/wire/assigner.py index c8b25498..c9d01930 100644 --- a/google/cloud/pubsublite/internal/wire/assigner.py +++ b/google/cloud/pubsublite/internal/wire/assigner.py @@ -20,9 +20,9 @@ class Assigner(AsyncContextManager): """ - An assigner will deliver a continuous stream of assignments when called into. Perform all necessary work with the - assignment before attempting to get the next one. - """ + An assigner will deliver a continuous stream of assignments when called into. Perform all necessary work with the + assignment before attempting to get the next one. + """ @abstractmethod async def get_assignment(self) -> Set[Partition]: diff --git a/google/cloud/pubsublite/internal/wire/committer.py b/google/cloud/pubsublite/internal/wire/committer.py index 2f52c8d1..f095a96b 100644 --- a/google/cloud/pubsublite/internal/wire/committer.py +++ b/google/cloud/pubsublite/internal/wire/committer.py @@ -20,8 +20,8 @@ class Committer(AsyncContextManager): """ - A Committer is able to commit subscribers' completed offsets. - """ + A Committer is able to commit subscribers' completed offsets. + """ @abstractmethod async def commit(self, cursor: Cursor) -> None: diff --git a/google/cloud/pubsublite/internal/wire/default_routing_policy.py b/google/cloud/pubsublite/internal/wire/default_routing_policy.py index 5e020182..d1bb34a2 100644 --- a/google/cloud/pubsublite/internal/wire/default_routing_policy.py +++ b/google/cloud/pubsublite/internal/wire/default_routing_policy.py @@ -22,9 +22,9 @@ class DefaultRoutingPolicy(RoutingPolicy): """ - The default routing policy which routes based on sha256 % num_partitions using the key if set or round robin if - unset. - """ + The default routing policy which routes based on sha256 % num_partitions using the key if set or round robin if + unset. + """ _num_partitions: int _current_round_robin: Partition diff --git a/google/cloud/pubsublite/internal/wire/make_publisher.py b/google/cloud/pubsublite/internal/wire/make_publisher.py index 26c2486e..799c22b4 100644 --- a/google/cloud/pubsublite/internal/wire/make_publisher.py +++ b/google/cloud/pubsublite/internal/wire/make_publisher.py @@ -61,22 +61,22 @@ def make_publisher( metadata: Optional[Mapping[str, str]] = None, ) -> Publisher: """ - Make a new publisher for the given topic. + Make a new publisher for the given topic. - Args: - topic: The topic to publish to. - transport: The transport type to use. - per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. - credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. - client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. - metadata: Additional metadata to send with the RPC. + Args: + topic: The topic to publish to. + transport: The transport type to use. + per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases. + credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None. + client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint. + metadata: Additional metadata to send with the RPC. - Returns: - A new Publisher. + Returns: + A new Publisher. - Throws: - GoogleApiCallException on any error determining topic structure. - """ + Throws: + GoogleApiCallException on any error determining topic structure. + """ if per_partition_batching_settings is None: per_partition_batching_settings = DEFAULT_BATCHING_SETTINGS admin_client = AdminClient( diff --git a/google/cloud/pubsublite/internal/wire/merge_metadata.py b/google/cloud/pubsublite/internal/wire/merge_metadata.py index 6ea31c3a..3a811ce4 100644 --- a/google/cloud/pubsublite/internal/wire/merge_metadata.py +++ b/google/cloud/pubsublite/internal/wire/merge_metadata.py @@ -19,8 +19,8 @@ def merge_metadata( a: Optional[Mapping[str, str]], b: Optional[Mapping[str, str]] ) -> Mapping[str, str]: """ - Merge the two sets of metadata if either exists. The second map overwrites the first. - """ + Merge the two sets of metadata if either exists. The second map overwrites the first. + """ result = {} if a: for k, v in a.items(): diff --git a/google/cloud/pubsublite/internal/wire/permanent_failable.py b/google/cloud/pubsublite/internal/wire/permanent_failable.py index 2fcf54a7..7522864e 100644 --- a/google/cloud/pubsublite/internal/wire/permanent_failable.py +++ b/google/cloud/pubsublite/internal/wire/permanent_failable.py @@ -52,13 +52,13 @@ def _failure_task(self) -> asyncio.Future: async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: """ - Await the awaitable, unless fail() is called first. - Args: - awaitable: An awaitable + Await the awaitable, unless fail() is called first. + Args: + awaitable: An awaitable - Returns: The result of the awaitable - Raises: The permanent error if fail() is called or the awaitable raises one. - """ + Returns: The result of the awaitable + Raises: The permanent error if fail() is called or the awaitable raises one. + """ async with _TaskWithCleanup(awaitable) as task: if self._failure_task.done(): raise self._failure_task.exception() @@ -71,11 +71,11 @@ async def await_unless_failed(self, awaitable: Awaitable[T]) -> T: async def run_poller(self, poll_action: Callable[[], Awaitable[None]]): """ - Run a polling loop, which runs poll_action forever unless this is failed. - Args: - poll_action: A callable returning an awaitable to run in a loop. Note that async functions which return once - satisfy this. - """ + Run a polling loop, which runs poll_action forever unless this is failed. + Args: + poll_action: A callable returning an awaitable to run in a loop. Note that async functions which return once + satisfy this. + """ try: while True: await self.await_unless_failed(poll_action()) diff --git a/google/cloud/pubsublite/internal/wire/publisher.py b/google/cloud/pubsublite/internal/wire/publisher.py index 35b581bb..c7352afe 100644 --- a/google/cloud/pubsublite/internal/wire/publisher.py +++ b/google/cloud/pubsublite/internal/wire/publisher.py @@ -20,21 +20,21 @@ class Publisher(AsyncContextManager): """ - A Pub/Sub Lite asynchronous wire protocol publisher. - """ + A Pub/Sub Lite asynchronous wire protocol publisher. + """ @abstractmethod async def publish(self, message: PubSubMessage) -> MessageMetadata: """ - Publish the provided message. + Publish the provided message. - Args: - message: The message to be published. + Args: + message: The message to be published. - Returns: - Metadata about the published message. + Returns: + Metadata about the published message. - Raises: - GoogleAPICallError: On a permanent error. - """ + Raises: + GoogleAPICallError: On a permanent error. + """ raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/retrying_connection.py b/google/cloud/pubsublite/internal/wire/retrying_connection.py index e1eb041f..634f8fa3 100644 --- a/google/cloud/pubsublite/internal/wire/retrying_connection.py +++ b/google/cloud/pubsublite/internal/wire/retrying_connection.py @@ -77,8 +77,8 @@ async def read(self) -> Response: async def _run_loop(self): """ - Processes actions on this connection and handles retries until cancelled. - """ + Processes actions on this connection and handles retries until cancelled. + """ last_failure: Optional[GoogleAPICallError] = None try: bad_retries = 0 diff --git a/google/cloud/pubsublite/internal/wire/routing_policy.py b/google/cloud/pubsublite/internal/wire/routing_policy.py index 362e04eb..9d7bb40b 100644 --- a/google/cloud/pubsublite/internal/wire/routing_policy.py +++ b/google/cloud/pubsublite/internal/wire/routing_policy.py @@ -24,11 +24,11 @@ class RoutingPolicy(ABC): @abstractmethod def route(self, message: PubSubMessage) -> Partition: """ - Route a message to a given partition. - Args: - message: The message to route + Route a message to a given partition. + Args: + message: The message to route - Returns: The partition to route to + Returns: The partition to route to - """ + """ raise NotImplementedError() diff --git a/google/cloud/pubsublite/internal/wire/serial_batcher.py b/google/cloud/pubsublite/internal/wire/serial_batcher.py index 218e5869..89cda847 100644 --- a/google/cloud/pubsublite/internal/wire/serial_batcher.py +++ b/google/cloud/pubsublite/internal/wire/serial_batcher.py @@ -26,11 +26,11 @@ class BatchTester(Generic[Request]): @abstractmethod def test(self, requests: Iterable[Request]) -> bool: """ - Args: - requests: The current outstanding batch. + Args: + requests: The current outstanding batch. - Returns: Whether that batch must be sent. - """ + Returns: Whether that batch must be sent. + """ raise NotImplementedError() @@ -44,14 +44,14 @@ def __init__(self, tester: BatchTester[Request]): def add(self, request: Request) -> "asyncio.Future[Response]": """Add a new request to this batcher. Callers must always call should_flush() after add, and flush() if that returns - true. + true. - Args: - request: The request to send. + Args: + request: The request to send. - Returns: - A future that will resolve to the response or a GoogleAPICallError. - """ + Returns: + A future that will resolve to the response or a GoogleAPICallError. + """ item = WorkItem[Request, Response](request) self._requests.append(item) return item.response_future diff --git a/google/cloud/pubsublite/internal/wire/subscriber.py b/google/cloud/pubsublite/internal/wire/subscriber.py index 45dfc1c4..86305366 100644 --- a/google/cloud/pubsublite/internal/wire/subscriber.py +++ b/google/cloud/pubsublite/internal/wire/subscriber.py @@ -19,25 +19,25 @@ class Subscriber(AsyncContextManager): """ - A Pub/Sub Lite asynchronous wire protocol subscriber. - """ + A Pub/Sub Lite asynchronous wire protocol subscriber. + """ @abstractmethod async def read(self) -> SequencedMessage: """ - Read the next message off of the stream. + Read the next message off of the stream. - Returns: - The next message. + Returns: + The next message. - Raises: - GoogleAPICallError: On a permanent error. - """ + Raises: + GoogleAPICallError: On a permanent error. + """ raise NotImplementedError() @abstractmethod async def allow_flow(self, request: FlowControlRequest): """ - Allow an additional amount of messages and bytes to be sent to this client. - """ + Allow an additional amount of messages and bytes to be sent to this client. + """ raise NotImplementedError() diff --git a/google/cloud/pubsublite/testing/test_utils.py b/google/cloud/pubsublite/testing/test_utils.py index 7655fcb8..766a5c01 100644 --- a/google/cloud/pubsublite/testing/test_utils.py +++ b/google/cloud/pubsublite/testing/test_utils.py @@ -32,9 +32,9 @@ def make_queue_waiter( started_q: "asyncio.Queue[None]", result_q: "asyncio.Queue[Union[T, Exception]]" ): """ - Given a queue to notify when started and a queue to get results from, return a waiter which - notifies started_q when started and returns from result_q when done. - """ + Given a queue to notify when started and a queue to get results from, return a waiter which + notifies started_q when started and returns from result_q when done. + """ async def waiter(*args, **kwargs): await started_q.put(None) diff --git a/google/cloud/pubsublite/types/location.py b/google/cloud/pubsublite/types/location.py index e24667f3..0ad7ae98 100644 --- a/google/cloud/pubsublite/types/location.py +++ b/google/cloud/pubsublite/types/location.py @@ -23,6 +23,13 @@ class CloudRegion(NamedTuple): def __str__(self): return self.name + @staticmethod + def parse(to_parse: str): + splits = to_parse.split("-") + if len(splits) != 2: + raise InvalidArgument("Invalid region name: " + to_parse) + return CloudRegion(name=splits[0] + "-" + splits[1]) + class CloudZone(NamedTuple): region: CloudRegion diff --git a/google/cloud/pubsublite/types/paths.py b/google/cloud/pubsublite/types/paths.py index 3901afac..9f60e182 100644 --- a/google/cloud/pubsublite/types/paths.py +++ b/google/cloud/pubsublite/types/paths.py @@ -16,12 +16,12 @@ from google.api_core.exceptions import InvalidArgument -from google.cloud.pubsublite.types.location import CloudZone +from google.cloud.pubsublite.types.location import CloudZone, CloudRegion class LocationPath(NamedTuple): project: Union[int, str] - location: CloudZone + location: Union[CloudRegion, CloudZone] def __str__(self): return f"projects/{self.project}/locations/{self.location}" @@ -79,3 +79,30 @@ def parse(to_parse: str) -> "SubscriptionPath": + to_parse ) return SubscriptionPath(splits[1], CloudZone.parse(splits[3]), splits[5]) + + +class ReservationPath(NamedTuple): + project: Union[int, str] + location: CloudRegion + name: str + + def __str__(self): + return f"projects/{self.project}/locations/{self.location}/reservations/{self.name}" + + def to_location_path(self): + return LocationPath(self.project, self.location) + + @staticmethod + def parse(to_parse: str) -> "ReservationPath": + splits = to_parse.split("/") + if ( + len(splits) != 6 + or splits[0] != "projects" + or splits[2] != "locations" + or splits[4] != "reservations" + ): + raise InvalidArgument( + "Reservation path must be formatted like projects/{project_number}/locations/{location}/reservations/{name} but was instead " + + to_parse + ) + return ReservationPath(splits[1], CloudZone.parse(splits[3]), splits[5])