Skip to content

Commit

Permalink
feat: Implement Publisher and subscriber factories (#24)
Browse files Browse the repository at this point in the history
* feat: Implement SinglePartitionSubscriber.

This handles mapping a single partition to a Cloud Pub/Sub Like asynchronous subscriber.

* feat: Add DefaultNackHandler.

* feat: Add AssigningSubscriber.

This handles changing partition assignments and creates AsyncSubscribers per-partition.

* feat: Implement publisher and subscriber factories.

* docs: Add comment.

Co-authored-by: Daniel Collins <dpcollins@google.com>
Co-authored-by: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com>
  • Loading branch information
3 people committed Sep 24, 2020
1 parent 6afd477 commit 4890cae
Show file tree
Hide file tree
Showing 7 changed files with 229 additions and 7 deletions.
Expand Up @@ -9,7 +9,7 @@
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.partition import Partition

_PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]
PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]


class _RunningSubscriber(NamedTuple):
Expand All @@ -19,7 +19,7 @@ class _RunningSubscriber(NamedTuple):

class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
_assigner: Assigner
_subscriber_factory: _PartitionSubscriberFactory
_subscriber_factory: PartitionSubscriberFactory

_subscribers: Dict[Partition, _RunningSubscriber]
_messages: "Queue[Message]"
Expand Down
66 changes: 66 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/make_publisher.py
@@ -0,0 +1,66 @@
from typing import Optional, Mapping

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import AsyncPublisherImpl
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
from google.cloud.pubsublite.internal.wire.make_publisher import make_publisher as make_wire_publisher
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.paths import TopicPath


def make_async_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> AsyncPublisher:
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncPublisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
underlying = make_wire_publisher(topic, batching_delay_secs, credentials, client_options, metadata)
return AsyncPublisherImpl(underlying)


def make_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> Publisher:
"""
Make a new publisher for the given topic.
Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new Publisher.
Throws:
GoogleApiCallException on any error determining topic structure.
"""
return PublisherImpl(make_async_publisher(topic, batching_delay_secs, credentials, client_options, metadata))
126 changes: 126 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/make_subscriber.py
@@ -0,0 +1,126 @@
from typing import Optional, Mapping, Set, AsyncIterator
from uuid import uuid4

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import PartitionSubscriberFactory, \
AssigningSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import SinglePartitionSubscriber
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer, DefaultMessageTransformer
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler, DefaultNackHandler
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.assigner_impl import AssignerImpl
from google.cloud.pubsublite.internal.wire.committer_impl import CommitterImpl
from google.cloud.pubsublite.internal.wire.fixed_set_assigner import FixedSetAssigner
from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnectionFactory
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.internal.wire.subscriber_impl import SubscriberImpl
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import SubscriptionPath
from google.cloud.pubsublite.routing_metadata import subscription_routing_metadata
from google.cloud.pubsublite_v1 import SubscribeRequest, InitialSubscribeRequest, StreamingCommitCursorRequest, \
PartitionAssignmentRequest, InitialPartitionAssignmentRequest, InitialCommitCursorRequest
from google.cloud.pubsublite_v1.services.subscriber_service.async_client import SubscriberServiceAsyncClient
from google.cloud.pubsublite_v1.services.partition_assignment_service.async_client import \
PartitionAssignmentServiceAsyncClient
from google.cloud.pubsublite_v1.services.cursor_service.async_client import CursorServiceAsyncClient

_DEFAULT_FLUSH_SECONDS = .1


def _make_dynamic_assigner(
subscription: SubscriptionPath,
assignment_client: PartitionAssignmentServiceAsyncClient,
base_metadata: Optional[Mapping[str, str]]) -> Assigner:
def assignment_connection_factory(requests: AsyncIterator[PartitionAssignmentRequest]):
return assignment_client.assign_partitions(requests, metadata=list(base_metadata.items()))

return AssignerImpl(InitialPartitionAssignmentRequest(subscription=str(subscription), client_id=uuid4().bytes),
GapicConnectionFactory(assignment_connection_factory))


def _make_partition_subscriber_factory(
subscription: SubscriptionPath,
subscribe_client: SubscriberServiceAsyncClient,
cursor_client: CursorServiceAsyncClient,
base_metadata: Optional[Mapping[str, str]],
flow_control_settings: FlowControlSettings,
nack_handler: NackHandler,
message_transformer: MessageTransformer
) -> PartitionSubscriberFactory:
def factory(partition: Partition) -> AsyncSubscriber:
final_metadata = merge_metadata(base_metadata, subscription_routing_metadata(subscription, partition))

def subscribe_connection_factory(requests: AsyncIterator[SubscribeRequest]):
return subscribe_client.subscribe(requests, metadata=list(final_metadata.items()))

def cursor_connection_factory(requests: AsyncIterator[StreamingCommitCursorRequest]):
return cursor_client.streaming_commit_cursor(requests, metadata=list(final_metadata.items()))

wire_subscriber = SubscriberImpl(
InitialSubscribeRequest(subscription=str(subscription), partition=partition.value),
_DEFAULT_FLUSH_SECONDS, GapicConnectionFactory(subscribe_connection_factory))
committer = CommitterImpl(
InitialCommitCursorRequest(subscription=str(subscription), partition=partition.value),
_DEFAULT_FLUSH_SECONDS, GapicConnectionFactory(cursor_connection_factory))
ack_set_tracker = AckSetTrackerImpl(committer)
return SinglePartitionSubscriber(wire_subscriber, flow_control_settings, ack_set_tracker, nack_handler,
message_transformer)

return factory


def make_async_subscriber(
subscription: SubscriptionPath,
per_partition_flow_control_settings: FlowControlSettings,
nack_handler: Optional[NackHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
fixed_partitions: Optional[Set[Partition]] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> AsyncSubscriber:
"""
Make a Pub/Sub Lite AsyncSubscriber.
Args:
subscription: The subscription to subscribe to.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.
Returns:
A new AsyncSubscriber.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(subscription.location.region))
assigner: Assigner
if fixed_partitions:
assigner = FixedSetAssigner(fixed_partitions)
else:
assignment_client = PartitionAssignmentServiceAsyncClient(credentials=credentials,
client_options=client_options) # type: ignore
assigner = _make_dynamic_assigner(subscription, assignment_client, metadata)

subscribe_client = SubscriberServiceAsyncClient(credentials=credentials,
client_options=client_options) # type: ignore
cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options) # type: ignore
if nack_handler is None:
nack_handler = DefaultNackHandler()
if message_transformer is None:
message_transformer = DefaultMessageTransformer()
partition_subscriber_factory = _make_partition_subscriber_factory(subscription, subscribe_client, cursor_client,
metadata, per_partition_flow_control_settings,
nack_handler, message_transformer)
return AssigningSubscriber(assigner, partition_subscriber_factory)
28 changes: 28 additions & 0 deletions google/cloud/pubsublite/internal/wire/fixed_set_assigner.py
@@ -0,0 +1,28 @@
import asyncio
from typing import Set

from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.partition import Partition


class FixedSetAssigner(Assigner):
_partitions: Set[Partition]
_returned_set: bool

def __init__(self, partitions: Set[Partition]):
self._partitions = partitions
self._returned_set = False

async def get_assignment(self) -> Set[Partition]:
"""Only returns an assignment the first iteration."""
if self._returned_set:
await asyncio.sleep(float('inf'))
raise RuntimeError('Should never happen.')
self._returned_set = True
return self._partitions

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
pass
3 changes: 2 additions & 1 deletion google/cloud/pubsublite/internal/wire/make_publisher.py
Expand Up @@ -19,7 +19,7 @@

def make_publisher(
topic: TopicPath,
batching_delay_secs: float = .05,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> Publisher:
Expand All @@ -39,6 +39,7 @@ def make_publisher(
Throws:
GoogleApiCallException on any error determining topic structure.
"""
batching_delay_secs = batching_delay_secs if batching_delay_secs is not None else .05
admin_client = make_admin_client(region=topic.location.region, credentials=credentials, client_options=client_options)
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(topic.location.region))
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/internal/wire/pubsub_context.py
Expand Up @@ -29,6 +29,6 @@ def pubsub_context(framework: Optional[str] = None) -> Mapping[str, str]:
version = _version()
context.fields["major_version"] = struct_pb2.Value(number_value=version.major)
context.fields["minor_version"] = struct_pb2.Value(number_value=version.minor)
encoded = b64encode(context.SerializeToString())
encoded = b64encode(context.SerializeToString()).decode("utf-8")
return {"x-goog-pubsub-context": encoded}

Expand Up @@ -7,11 +7,12 @@
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import AssigningSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import AssigningSubscriber, \
PartitionSubscriberFactory
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.testing.test_utils import make_queue_waiter, wire_queues
from google.cloud.pubsublite.testing.test_utils import wire_queues

# All test coroutines will be treated as marked.
pytestmark = pytest.mark.asyncio
Expand All @@ -29,7 +30,7 @@ def assigner():

@pytest.fixture()
def subscriber_factory():
return MagicMock(spec=Callable[[Partition], AsyncSubscriber])
return MagicMock(spec=PartitionSubscriberFactory)


@pytest.fixture()
Expand Down

0 comments on commit 4890cae

Please sign in to comment.