Skip to content

Commit

Permalink
fix: Make public API more similar to generated clients (#56)
Browse files Browse the repository at this point in the history
* fix: Make public API more similar to generated clients

* fix: Assorted fixes

* fix: Export clients in google.cloud.pubsublite.cloudpubsub

* fix: Add tests, break out repeated code, and restructure admin client

* fix: Import admin types to __init__.py and add overrides on async clients to copy docs

* fix: Lint issues

* fix: Fix deadlock in streaming pull shutdown

* fix: Ensure client awaitables passed to await_unless_failed are awaited
  • Loading branch information
dpcollins-google committed Nov 5, 2020
1 parent 32bc302 commit 7cf02ae
Show file tree
Hide file tree
Showing 37 changed files with 1,463 additions and 211 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Expand Up @@ -29,6 +29,10 @@ exclude_lines =
# Ignore abstract methods
raise NotImplementedError
@abstractmethod
# Ignore delegating methods
self._impl.
# Ignore __exit__ "return self"
return self

omit =
*/pubsublite_v1/*.py
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/pubsublite/__init__.py
Expand Up @@ -96,8 +96,14 @@
from google.cloud.pubsublite_v1.types.subscriber import SeekResponse
from google.cloud.pubsublite_v1.types.subscriber import SubscribeRequest
from google.cloud.pubsublite_v1.types.subscriber import SubscribeResponse
from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.admin_client import AdminClient

__all__ = (
# Manual files
"AdminClient",
"AdminClientInterface",
# Generated files
"AdminServiceAsyncClient",
"AdminServiceClient",
"AttributeValues",
Expand Down
108 changes: 75 additions & 33 deletions google/cloud/pubsublite/admin_client.py
@@ -1,67 +1,109 @@
from abc import ABC, abstractmethod
from typing import List
from typing import Optional, List

from overrides import overrides
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.internal.constructable_from_service_account import (
ConstructableFromServiceAccount,
)
from google.cloud.pubsublite.internal.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl
from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
LocationPath,
TopicPath,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic


class AdminClient(AdminClientInterface, ConstructableFromServiceAccount):
"""
An admin client for Pub/Sub Lite. Only operates on a single region.
"""

_impl: AdminClientInterface

def __init__(
self,
region: CloudRegion,
credentials: Optional[Credentials] = None,
transport: Optional[str] = None,
client_options: Optional[ClientOptions] = None,
):
"""
Create a new AdminClient.
Args:
region: The cloud region to connect to.
credentials: The credentials to use when connecting.
transport: The transport to use.
client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`.
"""
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(region))
self._impl = AdminClientImpl(
AdminServiceClient(
client_options=client_options,
transport=transport,
credentials=credentials,
),
region,
)

class AdminClient(ABC):
@abstractmethod
@overrides
def region(self) -> CloudRegion:
"""The region this client is for."""
return self._impl.region()

@abstractmethod
@overrides
def create_topic(self, topic: Topic) -> Topic:
"""Create a topic, returns the created topic."""
return self._impl.create_topic(topic)

@abstractmethod
@overrides
def get_topic(self, topic_path: TopicPath) -> Topic:
"""Get the topic object from the server."""
return self._impl.get_topic(topic_path)

@abstractmethod
@overrides
def get_topic_partition_count(self, topic_path: TopicPath) -> int:
"""Get the number of partitions in the provided topic."""
return self._impl.get_topic_partition_count(topic_path)

@abstractmethod
@overrides
def list_topics(self, location_path: LocationPath) -> List[Topic]:
"""List the Pub/Sub lite topics that exist for a project in a given location."""
return self._impl.list_topics(location_path)

@abstractmethod
@overrides
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic:
"""Update the masked fields of the provided topic."""
return self._impl.update_topic(topic, update_mask)

@abstractmethod
@overrides
def delete_topic(self, topic_path: TopicPath):
"""Delete a topic and all associated messages."""
return self._impl.delete_topic(topic_path)

@abstractmethod
@overrides
def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""
return self._impl.list_topic_subscriptions(topic_path)

@abstractmethod
@overrides
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""
return self._impl.create_subscription(subscription)

@abstractmethod
@overrides
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
"""Get the subscription object from the server."""
return self._impl.get_subscription(subscription_path)

@abstractmethod
@overrides
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]:
"""List the Pub/Sub lite subscriptions that exist for a project in a given location."""
return self._impl.list_subscriptions(location_path)

@abstractmethod
@overrides
def update_subscription(
self, subscription: Subscription, update_mask: FieldMask
) -> Subscription:
"""Update the masked fields of the provided subscription."""
return self._impl.update_subscription(subscription, update_mask)

@abstractmethod
@overrides
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
return self._impl.delete_subscription(subscription_path)
71 changes: 71 additions & 0 deletions google/cloud/pubsublite/admin_client_interface.py
@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod
from typing import List

from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask


class AdminClientInterface(ABC):
"""
An admin client for Pub/Sub Lite. Only operates on a single region.
"""

@abstractmethod
def region(self) -> CloudRegion:
"""The region this client is for."""

@abstractmethod
def create_topic(self, topic: Topic) -> Topic:
"""Create a topic, returns the created topic."""

@abstractmethod
def get_topic(self, topic_path: TopicPath) -> Topic:
"""Get the topic object from the server."""

@abstractmethod
def get_topic_partition_count(self, topic_path: TopicPath) -> int:
"""Get the number of partitions in the provided topic."""

@abstractmethod
def list_topics(self, location_path: LocationPath) -> List[Topic]:
"""List the Pub/Sub lite topics that exist for a project in a given location."""

@abstractmethod
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic:
"""Update the masked fields of the provided topic."""

@abstractmethod
def delete_topic(self, topic_path: TopicPath):
"""Delete a topic and all associated messages."""

@abstractmethod
def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""

@abstractmethod
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""

@abstractmethod
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
"""Get the subscription object from the server."""

@abstractmethod
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]:
"""List the Pub/Sub lite subscriptions that exist for a project in a given location."""

@abstractmethod
def update_subscription(
self, subscription: Subscription, update_mask: FieldMask
) -> Subscription:
"""Update the masked fields of the provided subscription."""

@abstractmethod
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/__init__.py
@@ -0,0 +1,13 @@
# flake8: noqa
from .message_transformer import MessageTransformer
from .nack_handler import NackHandler
from .publisher_client_interface import (
PublisherClientInterface,
AsyncPublisherClientInterface,
)
from .publisher_client import PublisherClient, AsyncPublisherClient
from .subscriber_client_interface import (
SubscriberClientInterface,
AsyncSubscriberClientInterface,
)
from .subscriber_client import SubscriberClient, AsyncSubscriberClient
Expand Up @@ -3,21 +3,23 @@

from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.types import Partition

PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]
PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber]


class _RunningSubscriber(NamedTuple):
subscriber: AsyncSubscriber
subscriber: AsyncSingleSubscriber
poller: Future


class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable):
_assigner_factory: Callable[[], Assigner]
_subscriber_factory: PartitionSubscriberFactory

Expand Down Expand Up @@ -47,7 +49,7 @@ def __init__(
async def read(self) -> Message:
return await self.await_unless_failed(self._messages.get())

async def _subscribe_action(self, subscriber: AsyncSubscriber):
async def _subscribe_action(self, subscriber: AsyncSingleSubscriber):
message = await subscriber.read()
await self._messages.put(message)

Expand Down
Expand Up @@ -5,11 +5,13 @@
from google.cloud.pubsublite.cloudpubsub.message_transforms import (
from_cps_publish_message,
)
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
AsyncSinglePublisher,
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher


class AsyncPublisherImpl(AsyncPublisher):
class AsyncSinglePublisherImpl(AsyncSinglePublisher):
_publisher_factory: Callable[[], Publisher]
_publisher: Optional[Publisher]

Expand Down

0 comments on commit 7cf02ae

Please sign in to comment.