Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Make public API more similar to generated clients #56

Merged
merged 8 commits into from Nov 5, 2020
4 changes: 4 additions & 0 deletions .coveragerc
Expand Up @@ -29,6 +29,10 @@ exclude_lines =
# Ignore abstract methods
raise NotImplementedError
@abstractmethod
# Ignore delegating methods
self._impl.
# Ignore __exit__ "return self"
return self

omit =
*/pubsublite_v1/*.py
Expand Down
6 changes: 6 additions & 0 deletions google/cloud/pubsublite/__init__.py
Expand Up @@ -96,8 +96,14 @@
from google.cloud.pubsublite_v1.types.subscriber import SeekResponse
from google.cloud.pubsublite_v1.types.subscriber import SubscribeRequest
from google.cloud.pubsublite_v1.types.subscriber import SubscribeResponse
from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.admin_client import AdminClient

__all__ = (
# Manual files
"AdminClient",
"AdminClientInterface",
# Generated files
"AdminServiceAsyncClient",
"AdminServiceClient",
"AttributeValues",
Expand Down
108 changes: 75 additions & 33 deletions google/cloud/pubsublite/admin_client.py
@@ -1,67 +1,109 @@
from abc import ABC, abstractmethod
from typing import List
from typing import Optional, List

from overrides import overrides
from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials
from google.protobuf.field_mask_pb2 import FieldMask

from google.cloud.pubsublite.admin_client_interface import AdminClientInterface
from google.cloud.pubsublite.internal.constructable_from_service_account import (
ConstructableFromServiceAccount,
)
from google.cloud.pubsublite.internal.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.admin_client_impl import AdminClientImpl
from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
LocationPath,
TopicPath,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask
from google.cloud.pubsublite_v1 import AdminServiceClient, Subscription, Topic


class AdminClient(AdminClientInterface, ConstructableFromServiceAccount):
"""
An admin client for Pub/Sub Lite. Only operates on a single region.
"""

_impl: AdminClientInterface

def __init__(
self,
region: CloudRegion,
credentials: Optional[Credentials] = None,
transport: Optional[str] = None,
client_options: Optional[ClientOptions] = None,
):
"""
Create a new AdminClient.

Args:
region: The cloud region to connect to.
credentials: The credentials to use when connecting.
transport: The transport to use.
client_options: The client options to use when connecting. If used, must explicitly set `api_endpoint`.
"""
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(region))
self._impl = AdminClientImpl(
AdminServiceClient(
client_options=client_options,
transport=transport,
credentials=credentials,
),
region,
)

class AdminClient(ABC):
@abstractmethod
@overrides
def region(self) -> CloudRegion:
"""The region this client is for."""
return self._impl.region()

@abstractmethod
@overrides
def create_topic(self, topic: Topic) -> Topic:
"""Create a topic, returns the created topic."""
return self._impl.create_topic(topic)

@abstractmethod
@overrides
def get_topic(self, topic_path: TopicPath) -> Topic:
"""Get the topic object from the server."""
return self._impl.get_topic(topic_path)

@abstractmethod
@overrides
def get_topic_partition_count(self, topic_path: TopicPath) -> int:
"""Get the number of partitions in the provided topic."""
return self._impl.get_topic_partition_count(topic_path)

@abstractmethod
@overrides
def list_topics(self, location_path: LocationPath) -> List[Topic]:
"""List the Pub/Sub lite topics that exist for a project in a given location."""
return self._impl.list_topics(location_path)

@abstractmethod
@overrides
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic:
"""Update the masked fields of the provided topic."""
return self._impl.update_topic(topic, update_mask)

@abstractmethod
@overrides
def delete_topic(self, topic_path: TopicPath):
"""Delete a topic and all associated messages."""
return self._impl.delete_topic(topic_path)

@abstractmethod
@overrides
def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""
return self._impl.list_topic_subscriptions(topic_path)

@abstractmethod
@overrides
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""
return self._impl.create_subscription(subscription)

@abstractmethod
@overrides
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
"""Get the subscription object from the server."""
return self._impl.get_subscription(subscription_path)

@abstractmethod
@overrides
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]:
"""List the Pub/Sub lite subscriptions that exist for a project in a given location."""
return self._impl.list_subscriptions(location_path)

@abstractmethod
@overrides
def update_subscription(
self, subscription: Subscription, update_mask: FieldMask
) -> Subscription:
"""Update the masked fields of the provided subscription."""
return self._impl.update_subscription(subscription, update_mask)

@abstractmethod
@overrides
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
return self._impl.delete_subscription(subscription_path)
71 changes: 71 additions & 0 deletions google/cloud/pubsublite/admin_client_interface.py
@@ -0,0 +1,71 @@
from abc import ABC, abstractmethod
from typing import List

from google.cloud.pubsublite.types import (
CloudRegion,
TopicPath,
LocationPath,
SubscriptionPath,
)
from google.cloud.pubsublite_v1 import Topic, Subscription
from google.protobuf.field_mask_pb2 import FieldMask


class AdminClientInterface(ABC):
"""
An admin client for Pub/Sub Lite. Only operates on a single region.
"""

@abstractmethod
def region(self) -> CloudRegion:
"""The region this client is for."""

@abstractmethod
def create_topic(self, topic: Topic) -> Topic:
"""Create a topic, returns the created topic."""

@abstractmethod
def get_topic(self, topic_path: TopicPath) -> Topic:
"""Get the topic object from the server."""

@abstractmethod
def get_topic_partition_count(self, topic_path: TopicPath) -> int:
"""Get the number of partitions in the provided topic."""

@abstractmethod
def list_topics(self, location_path: LocationPath) -> List[Topic]:
"""List the Pub/Sub lite topics that exist for a project in a given location."""

@abstractmethod
def update_topic(self, topic: Topic, update_mask: FieldMask) -> Topic:
"""Update the masked fields of the provided topic."""

@abstractmethod
def delete_topic(self, topic_path: TopicPath):
"""Delete a topic and all associated messages."""

@abstractmethod
def list_topic_subscriptions(self, topic_path: TopicPath):
"""List the subscriptions that exist for a given topic."""

@abstractmethod
def create_subscription(self, subscription: Subscription) -> Subscription:
"""Create a subscription, returns the created subscription."""

@abstractmethod
def get_subscription(self, subscription_path: SubscriptionPath) -> Subscription:
"""Get the subscription object from the server."""

@abstractmethod
def list_subscriptions(self, location_path: LocationPath) -> List[Subscription]:
"""List the Pub/Sub lite subscriptions that exist for a project in a given location."""

@abstractmethod
def update_subscription(
self, subscription: Subscription, update_mask: FieldMask
) -> Subscription:
"""Update the masked fields of the provided subscription."""

@abstractmethod
def delete_subscription(self, subscription_path: SubscriptionPath):
"""Delete a subscription and all associated messages."""
13 changes: 13 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/__init__.py
@@ -0,0 +1,13 @@
# flake8: noqa
from .message_transformer import MessageTransformer
from .nack_handler import NackHandler
from .publisher_client_interface import (
PublisherClientInterface,
AsyncPublisherClientInterface,
)
from .publisher_client import PublisherClient, AsyncPublisherClient
from .subscriber_client_interface import (
SubscriberClientInterface,
AsyncSubscriberClientInterface,
)
from .subscriber_client import SubscriberClient, AsyncSubscriberClient
Expand Up @@ -3,21 +3,23 @@

from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import (
AsyncSingleSubscriber,
)
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.types import Partition

PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]
PartitionSubscriberFactory = Callable[[Partition], AsyncSingleSubscriber]


class _RunningSubscriber(NamedTuple):
subscriber: AsyncSubscriber
subscriber: AsyncSingleSubscriber
poller: Future


class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
class AssigningSingleSubscriber(AsyncSingleSubscriber, PermanentFailable):
_assigner_factory: Callable[[], Assigner]
_subscriber_factory: PartitionSubscriberFactory

Expand Down Expand Up @@ -47,7 +49,7 @@ def __init__(
async def read(self) -> Message:
return await self.await_unless_failed(self._messages.get())

async def _subscribe_action(self, subscriber: AsyncSubscriber):
async def _subscribe_action(self, subscriber: AsyncSingleSubscriber):
message = await subscriber.read()
await self._messages.put(message)

Expand Down
Expand Up @@ -5,11 +5,13 @@
from google.cloud.pubsublite.cloudpubsub.message_transforms import (
from_cps_publish_message,
)
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher
from google.cloud.pubsublite.cloudpubsub.internal.single_publisher import (
AsyncSinglePublisher,
)
from google.cloud.pubsublite.internal.wire.publisher import Publisher


class AsyncPublisherImpl(AsyncPublisher):
class AsyncSinglePublisherImpl(AsyncSinglePublisher):
_publisher_factory: Callable[[], Publisher]
_publisher: Optional[Publisher]

Expand Down