Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Publisher and subscriber factories #24

Merged
merged 6 commits into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -9,7 +9,7 @@
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.partition import Partition

_PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]
PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]


class _RunningSubscriber(NamedTuple):
Expand All @@ -19,13 +19,13 @@ class _RunningSubscriber(NamedTuple):

class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
_assigner: Assigner
_subscriber_factory: _PartitionSubscriberFactory
_subscriber_factory: PartitionSubscriberFactory

_subscribers: Dict[Partition, _RunningSubscriber]
_messages: "Queue[Message]"
_assign_poller: Future

def __init__(self, assigner: Assigner, subscriber_factory: _PartitionSubscriberFactory):
def __init__(self, assigner: Assigner, subscriber_factory: PartitionSubscriberFactory):
super().__init__()
self._assigner = assigner
self._subscriber_factory = subscriber_factory
Expand Down
66 changes: 66 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/make_publisher.py
@@ -0,0 +1,66 @@
from typing import Optional, Mapping

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import AsyncPublisherImpl
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
from google.cloud.pubsublite.internal.wire.make_publisher import make_publisher as make_wire_publisher
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.paths import TopicPath


def make_async_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> AsyncPublisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new AsyncPublisher.

Throws:
GoogleApiCallException on any error determining topic structure.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
underlying = make_wire_publisher(topic, batching_delay_secs, credentials, client_options, metadata)
return AsyncPublisherImpl(underlying)


def make_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> Publisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new Publisher.

Throws:
GoogleApiCallException on any error determining topic structure.
"""
return PublisherImpl(make_async_publisher(topic, batching_delay_secs, credentials, client_options, metadata))
126 changes: 126 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/make_subscriber.py
@@ -0,0 +1,126 @@
from typing import Optional, Mapping, Set, AsyncIterator
from uuid import uuid4

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker_impl import AckSetTrackerImpl
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import PartitionSubscriberFactory, \
AssigningSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.single_partition_subscriber import SinglePartitionSubscriber
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer, DefaultMessageTransformer
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler, DefaultNackHandler
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.endpoints import regional_endpoint
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.assigner_impl import AssignerImpl
from google.cloud.pubsublite.internal.wire.committer_impl import CommitterImpl
from google.cloud.pubsublite.internal.wire.fixed_set_assigner import FixedSetAssigner
from google.cloud.pubsublite.internal.wire.gapic_connection import GapicConnectionFactory
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.internal.wire.subscriber_impl import SubscriberImpl
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.paths import SubscriptionPath
from google.cloud.pubsublite.routing_metadata import subscription_routing_metadata
from google.cloud.pubsublite_v1 import SubscribeRequest, InitialSubscribeRequest, StreamingCommitCursorRequest, \
PartitionAssignmentRequest, InitialPartitionAssignmentRequest, InitialCommitCursorRequest
from google.cloud.pubsublite_v1.services.subscriber_service.async_client import SubscriberServiceAsyncClient
from google.cloud.pubsublite_v1.services.partition_assignment_service.async_client import \
PartitionAssignmentServiceAsyncClient
from google.cloud.pubsublite_v1.services.cursor_service.async_client import CursorServiceAsyncClient

_DEFAULT_FLUSH_SECONDS = .1


def _make_dynamic_assigner(
subscription: SubscriptionPath,
assignment_client: PartitionAssignmentServiceAsyncClient,
base_metadata: Optional[Mapping[str, str]]) -> Assigner:
def assignment_connection_factory(requests: AsyncIterator[PartitionAssignmentRequest]):
return assignment_client.assign_partitions(requests, metadata=list(base_metadata.items()))

return AssignerImpl(InitialPartitionAssignmentRequest(subscription=str(subscription), client_id=uuid4().bytes),
GapicConnectionFactory(assignment_connection_factory))


def _make_partition_subscriber_factory(
subscription: SubscriptionPath,
subscribe_client: SubscriberServiceAsyncClient,
cursor_client: CursorServiceAsyncClient,
base_metadata: Optional[Mapping[str, str]],
flow_control_settings: FlowControlSettings,
nack_handler: NackHandler,
message_transformer: MessageTransformer
) -> PartitionSubscriberFactory:
def factory(partition: Partition) -> AsyncSubscriber:
final_metadata = merge_metadata(base_metadata, subscription_routing_metadata(subscription, partition))

def subscribe_connection_factory(requests: AsyncIterator[SubscribeRequest]):
return subscribe_client.subscribe(requests, metadata=list(final_metadata.items()))

def cursor_connection_factory(requests: AsyncIterator[StreamingCommitCursorRequest]):
return cursor_client.streaming_commit_cursor(requests, metadata=list(final_metadata.items()))

wire_subscriber = SubscriberImpl(
InitialSubscribeRequest(subscription=str(subscription), partition=partition.value),
_DEFAULT_FLUSH_SECONDS, GapicConnectionFactory(subscribe_connection_factory))
committer = CommitterImpl(
InitialCommitCursorRequest(subscription=str(subscription), partition=partition.value),
_DEFAULT_FLUSH_SECONDS, GapicConnectionFactory(cursor_connection_factory))
ack_set_tracker = AckSetTrackerImpl(committer)
return SinglePartitionSubscriber(wire_subscriber, flow_control_settings, ack_set_tracker, nack_handler,
message_transformer)

return factory


def make_async_subscriber(
subscription: SubscriptionPath,
per_partition_flow_control_settings: FlowControlSettings,
nack_handler: Optional[NackHandler] = None,
message_transformer: Optional[MessageTransformer] = None,
fixed_partitions: Optional[Set[Partition]] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> AsyncSubscriber:
"""
Make a Pub/Sub Lite AsyncSubscriber.

Args:
subscription: The subscription to subscribe to.
per_partition_flow_control_settings: The flow control settings for each partition subscribed to. Note that these
settings apply to each partition individually, not in aggregate.
nack_handler: An optional handler for when nack() is called on a Message. The default will fail the client.
message_transformer: An optional transformer from Pub/Sub Lite messages to Cloud Pub/Sub messages.
fixed_partitions: A fixed set of partitions to subscribe to. If not present, will instead use auto-assignment.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new AsyncSubscriber.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(subscription.location.region))
assigner: Assigner
if fixed_partitions:
assigner = FixedSetAssigner(fixed_partitions)
else:
assignment_client = PartitionAssignmentServiceAsyncClient(credentials=credentials,
client_options=client_options) # type: ignore
assigner = _make_dynamic_assigner(subscription, assignment_client, metadata)

subscribe_client = SubscriberServiceAsyncClient(credentials=credentials,
client_options=client_options) # type: ignore
cursor_client = CursorServiceAsyncClient(credentials=credentials, client_options=client_options) # type: ignore
if nack_handler is None:
nack_handler = DefaultNackHandler()
if message_transformer is None:
message_transformer = DefaultMessageTransformer()
partition_subscriber_factory = _make_partition_subscriber_factory(subscription, subscribe_client, cursor_client,
metadata, per_partition_flow_control_settings,
nack_handler, message_transformer)
return AssigningSubscriber(assigner, partition_subscriber_factory)
27 changes: 27 additions & 0 deletions google/cloud/pubsublite/internal/wire/fixed_set_assigner.py
@@ -0,0 +1,27 @@
import asyncio
from typing import Set

from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.partition import Partition


class FixedSetAssigner(Assigner):
dpcollins-google marked this conversation as resolved.
Show resolved Hide resolved
_partitions: Set[Partition]
_returned_set: bool

def __init__(self, partitions: Set[Partition]):
self._partitions = partitions
self._returned_set = False

async def get_assignment(self) -> Set[Partition]:
if self._returned_set:
await asyncio.sleep(float('inf'))
raise RuntimeError('Should never happen.')
self._returned_set = True
return self._partitions

async def __aenter__(self):
return self

async def __aexit__(self, exc_type, exc_value, traceback):
pass
3 changes: 2 additions & 1 deletion google/cloud/pubsublite/internal/wire/make_publisher.py
Expand Up @@ -19,7 +19,7 @@

def make_publisher(
topic: TopicPath,
batching_delay_secs: float = .05,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None) -> Publisher:
Expand All @@ -39,6 +39,7 @@ def make_publisher(
Throws:
GoogleApiCallException on any error determining topic structure.
"""
batching_delay_secs = batching_delay_secs if batching_delay_secs is not None else .05
admin_client = make_admin_client(region=topic.location.region, credentials=credentials, client_options=client_options)
if client_options is None:
client_options = ClientOptions(api_endpoint=regional_endpoint(topic.location.region))
Expand Down
2 changes: 1 addition & 1 deletion google/cloud/pubsublite/internal/wire/pubsub_context.py
Expand Up @@ -29,6 +29,6 @@ def pubsub_context(framework: Optional[str] = None) -> Mapping[str, str]:
version = _version()
context.fields["major_version"] = struct_pb2.Value(number_value=version.major)
context.fields["minor_version"] = struct_pb2.Value(number_value=version.minor)
encoded = b64encode(context.SerializeToString())
encoded = b64encode(context.SerializeToString()).decode("utf-8")
return {"x-goog-pubsub-context": encoded}

@@ -1,17 +1,17 @@
import asyncio
from typing import Callable, Set
from typing import Set

from asynctest.mock import MagicMock, call
import pytest
from google.api_core.exceptions import FailedPrecondition
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import AssigningSubscriber
from google.cloud.pubsublite.cloudpubsub.internal.assigning_subscriber import AssigningSubscriber, \
PartitionSubscriberFactory
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.partition import Partition
from google.cloud.pubsublite.testing.test_utils import make_queue_waiter, wire_queues
from google.cloud.pubsublite.testing.test_utils import wire_queues

# All test coroutines will be treated as marked.
pytestmark = pytest.mark.asyncio
Expand All @@ -29,7 +29,7 @@ def assigner():

@pytest.fixture()
def subscriber_factory():
return MagicMock(spec=Callable[[Partition], AsyncSubscriber])
return MagicMock(spec=PartitionSubscriberFactory)


@pytest.fixture()
Expand Down