Skip to content

Commit

Permalink
fix: Add admin interfaces for reservations (#159)
Browse files Browse the repository at this point in the history
* fix: Add admin interfaces for reservations

Also run blacken (reformatter)

Also remove default python interpreter for any sessions where we don't care what interpreter is used, allowing `nox -s blacken` and others to work if you're running under 3.7, 3.9, etc.

* 🦉 Updates from OwlBot

* fix: add warning

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Jun 11, 2021
1 parent 77db700 commit ad0f3d2
Show file tree
Hide file tree
Showing 26 changed files with 416 additions and 262 deletions.
38 changes: 36 additions & 2 deletions google/cloud/pubsublite/admin_client.py
Expand Up @@ -32,7 +32,13 @@
TopicPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import (
AdminServiceClient,
Subscription,
Topic,
Reservation,
)


class AdminClient(AdminClientInterface, ConstructableFromServiceAccount):
Expand Down Expand Up @@ -98,7 +104,7 @@ def delete_topic(self, topic_path: TopicPath):
return self._impl.delete_topic(topic_path)

@overrides
def list_topic_subscriptions(self, topic_path: TopicPath):
def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]:
return self._impl.list_topic_subscriptions(topic_path)

@overrides
Expand Down Expand Up @@ -126,3 +132,31 @@ def update_subscription(
@overrides
def delete_subscription(self, subscription_path: SubscriptionPath):
return self._impl.delete_subscription(subscription_path)

@overrides
def create_reservation(self, reservation: Reservation) -> Reservation:
return self._impl.create_reservation(reservation)

@overrides
def get_reservation(self, reservation_path: ReservationPath) -> Reservation:
return self._impl.get_reservation(reservation_path)

@overrides
def list_reservations(self, location_path: LocationPath) -> List[Reservation]:
return self._impl.list_reservations(location_path)

@overrides
def update_reservation(
self, reservation: Reservation, update_mask: FieldMask
) -> Reservation:
return self._impl.update_reservation(reservation, update_mask)

@overrides
def delete_reservation(self, reservation_path: ReservationPath):
return self._impl.delete_reservation(reservation_path)

@overrides
def list_reservation_topics(
self, reservation_path: ReservationPath
) -> List[TopicPath]:
return self._impl.list_reservation_topics(reservation_path)
51 changes: 49 additions & 2 deletions google/cloud/pubsublite/admin_client_interface.py
Expand Up @@ -22,7 +22,8 @@
SubscriptionPath,
BacklogLocation,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.cloud.pubsublite.types.paths import ReservationPath
from google.cloud.pubsublite_v1 import Topic, Subscription, Reservation
from google.protobuf.field_mask_pb2 import FieldMask


Expand Down Expand Up @@ -60,7 +61,7 @@ def delete_topic(self, topic_path: TopicPath):
"""Delete a topic and all associated messages."""

@abstractmethod
def list_topic_subscriptions(self, topic_path: TopicPath):
def list_topic_subscriptions(self, topic_path: TopicPath) -> List[SubscriptionPath]:
"""List the subscriptions that exist for a given topic."""

@abstractmethod
Expand Down Expand Up @@ -90,3 +91,49 @@ def update_subscription(
@abstractmethod
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""

@abstractmethod
def create_reservation(self, reservation: Reservation) -> Reservation:
"""Create a reservation, returns the created reservation.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""

@abstractmethod
def get_reservation(self, reservation_path: ReservationPath) -> Reservation:
"""Get the reservation object from the server.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""

@abstractmethod
def list_reservations(self, location_path: LocationPath) -> List[Reservation]:
"""List the Pub/Sub lite reservations that exist for a project in a given location.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""

@abstractmethod
def update_reservation(
self, reservation: Reservation, update_mask: FieldMask
) -> Reservation:
"""Update the masked fields of the provided reservation.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""

@abstractmethod
def delete_reservation(self, reservation_path: ReservationPath):
"""Delete a reservation and all associated messages.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""

@abstractmethod
def list_reservation_topics(
self, reservation_path: ReservationPath
) -> List[TopicPath]:
"""List the subscriptions that exist for a given reservation.
warning:: This may not be implemented in the backend, it is a pre-release feature.
"""
38 changes: 19 additions & 19 deletions google/cloud/pubsublite/cloudpubsub/internal/ack_set_tracker.py
Expand Up @@ -18,39 +18,39 @@

class AckSetTracker(AsyncContextManager):
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
"""
An AckSetTracker tracks disjoint acknowledged messages and commits them when a contiguous prefix of tracked offsets
is aggregated.
"""

@abstractmethod
def track(self, offset: int):
"""
Track the provided offset.
Track the provided offset.
Args:
offset: the offset to track.
Args:
offset: the offset to track.
Raises:
GoogleAPICallError: On an invalid offset to track.
"""
Raises:
GoogleAPICallError: On an invalid offset to track.
"""

@abstractmethod
async def ack(self, offset: int):
"""
Acknowledge the message with the provided offset. The offset must have previously been tracked.
Acknowledge the message with the provided offset. The offset must have previously been tracked.
Args:
offset: the offset to acknowledge.
Args:
offset: the offset to acknowledge.
Returns:
GoogleAPICallError: On a commit failure.
"""
Returns:
GoogleAPICallError: On a commit failure.
"""

@abstractmethod
async def clear_and_commit(self):
"""
Discard all outstanding acks and wait for the commit offset to be acknowledged by the server.
Discard all outstanding acks and wait for the commit offset to be acknowledged by the server.
Raises:
GoogleAPICallError: If the committer has shut down due to a permanent error.
"""
Raises:
GoogleAPICallError: If the committer has shut down due to a permanent error.
"""
64 changes: 32 additions & 32 deletions google/cloud/pubsublite/cloudpubsub/internal/make_publisher.py
Expand Up @@ -49,22 +49,22 @@ def make_async_publisher(
metadata: Optional[Mapping[str, str]] = None,
) -> AsyncSinglePublisher:
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncPublisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncPublisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)

def underlying_factory():
Expand All @@ -89,22 +89,22 @@ def make_publisher(
metadata: Optional[Mapping[str, str]] = None,
) -> SinglePublisher:
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new Publisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
transport: The transport type to use.
per_partition_batching_settings: Settings for batching messages on each partition. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new Publisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
return SinglePublisherImpl(
make_async_publisher(
topic=topic,
Expand Down
34 changes: 17 additions & 17 deletions google/cloud/pubsublite/cloudpubsub/internal/make_subscriber.py
Expand Up @@ -175,23 +175,23 @@ def make_async_subscriber(
metadata: Optional[Mapping[str, str]] = None,
) -> AsyncSingleSubscriber:
"""
Make a Pub/Sub Lite AsyncSubscriber.
Args:
subscription: The subscription to subscribe to.
transport: The transport type to use.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncSubscriber.
"""
Make a Pub/Sub Lite AsyncSubscriber.
Args:
subscription: The subscription to subscribe to.
transport: The transport type to use.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncSubscriber.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
if client_options is None:
client_options = ClientOptions(
Expand Down
54 changes: 27 additions & 27 deletions google/cloud/pubsublite/cloudpubsub/internal/single_publisher.py
Expand Up @@ -19,54 +19,54 @@

class AsyncSinglePublisher(AsyncContextManager):
"""
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are permanent.
An AsyncPublisher publishes messages similar to Google Pub/Sub, but must be used in an
async context. Any publish failures are permanent.
Must be used in an `async with` block or have __aenter__() awaited before use.
"""
Must be used in an `async with` block or have __aenter__() awaited before use.
"""

@abstractmethod
async def publish(
self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]
) -> str:
"""
Publish a message.
Publish a message.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Returns:
An ack id, which can be decoded using MessageMetadata.decode.
Returns:
An ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
"""
Raises:
GoogleApiCallError: On a permanent failure.
"""


class SinglePublisher(ContextManager):
"""
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
A Publisher publishes messages similar to Google Pub/Sub. Any publish failures are permanent.
Must be used in a `with` block or have __enter__() called before use.
"""
Must be used in a `with` block or have __enter__() called before use.
"""

@abstractmethod
def publish(
self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]
) -> "futures.Future[str]":
"""
Publish a message.
Publish a message.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Args:
data: The bytestring payload of the message
ordering_key: The key to enforce ordering on, or "" for no ordering.
**attrs: Additional attributes to send.
Returns:
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
Returns:
A future completed with an ack id, which can be decoded using MessageMetadata.decode.
Raises:
GoogleApiCallError: On a permanent failure.
"""
Raises:
GoogleApiCallError: On a permanent failure.
"""

0 comments on commit ad0f3d2

Please sign in to comment.