Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement Publisher and subscriber factories #24

Merged
merged 6 commits into from Sep 24, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 11 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/flow_control_settings.py
@@ -0,0 +1,11 @@
from typing import NamedTuple


class FlowControlSettings(NamedTuple):
messages_outstanding: int
bytes_outstanding: int


_MAX_INT64 = 0x7FFFFFFFFFFFFFFF

DISABLED_FLOW_CONTROL = FlowControlSettings(_MAX_INT64, _MAX_INT64)
Expand Up @@ -15,6 +15,7 @@ class AckSetTrackerImpl(AckSetTracker):
_acks: "queue.PriorityQueue[int]"

def __init__(self, committer: Committer):
super().__init__()
self._committer = committer
self._receipts = deque()
self._acks = queue.PriorityQueue()
Expand Down
@@ -0,0 +1,73 @@
from asyncio import Future, Queue, ensure_future
from typing import Callable, NamedTuple, Dict, Set

from google.cloud.pubsub_v1.subscriber.message import Message

from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wait_ignore_cancelled import wait_ignore_cancelled
from google.cloud.pubsublite.internal.wire.assigner import Assigner
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.partition import Partition

PartitionSubscriberFactory = Callable[[Partition], AsyncSubscriber]


class _RunningSubscriber(NamedTuple):
subscriber: AsyncSubscriber
poller: Future


class AssigningSubscriber(AsyncSubscriber, PermanentFailable):
_assigner: Assigner
_subscriber_factory: PartitionSubscriberFactory

_subscribers: Dict[Partition, _RunningSubscriber]
_messages: "Queue[Message]"
_assign_poller: Future

def __init__(self, assigner: Assigner, subscriber_factory: PartitionSubscriberFactory):
super().__init__()
self._assigner = assigner
self._subscriber_factory = subscriber_factory
self._subscribers = {}
self._messages = Queue()

async def read(self) -> Message:
return await self.await_unless_failed(self._messages.get())

async def _subscribe_action(self, subscriber: AsyncSubscriber):
message = await subscriber.read()
await self._messages.put(message)

async def _start_subscriber(self, partition: Partition):
new_subscriber = self._subscriber_factory(partition)
await new_subscriber.__aenter__()
poller = ensure_future(self.run_poller(lambda: self._subscribe_action(new_subscriber)))
self._subscribers[partition] = _RunningSubscriber(new_subscriber, poller)

async def _stop_subscriber(self, running: _RunningSubscriber):
running.poller.cancel()
await wait_ignore_cancelled(running.poller)
await running.subscriber.__aexit__(None, None, None)

async def _assign_action(self):
assignment: Set[Partition] = await self._assigner.get_assignment()
added_partitions = assignment - self._subscribers.keys()
removed_partitions = self._subscribers.keys() - assignment
for partition in added_partitions:
await self._start_subscriber(partition)
for partition in removed_partitions:
await self._stop_subscriber(self._subscribers[partition])
del self._subscribers[partition]

async def __aenter__(self):
await self._assigner.__aenter__()
self._assign_poller = ensure_future(self.run_poller(self._assign_action))
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._assign_poller.cancel()
await wait_ignore_cancelled(self._assign_poller)
await self._assigner.__aexit__(exc_type, exc_value, traceback)
for running in self._subscribers.values():
await self._stop_subscriber(running)
Expand Up @@ -11,6 +11,7 @@ class AsyncPublisherImpl(AsyncPublisher):
_publisher: Publisher

def __init__(self, publisher: Publisher):
super().__init__()
self._publisher = publisher

async def publish(self, data: bytes, ordering_key: str = "", **attrs: Mapping[str, str]) -> str:
Expand Down
Expand Up @@ -15,7 +15,7 @@ def __init__(self):
def __enter__(self):
self._thread.start()

def __exit__(self, __exc_type, __exc_value, __traceback):
def __exit__(self, exc_type, exc_value, traceback):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()

Expand Down
Expand Up @@ -10,6 +10,7 @@ class PublisherImpl(Publisher):
_underlying: AsyncPublisher

def __init__(self, underlying: AsyncPublisher):
super().__init__()
self._managed_loop = ManagedEventLoop()
self._underlying = underlying

Expand Down
@@ -0,0 +1,119 @@
import asyncio
from typing import Union, Dict, NamedTuple
import queue

from google.api_core.exceptions import FailedPrecondition, GoogleAPICallError
from google.cloud.pubsub_v1.subscriber.message import Message
from google.pubsub_v1 import PubsubMessage

from google.cloud.pubsublite.cloudpubsub.flow_control_settings import FlowControlSettings
from google.cloud.pubsublite.cloudpubsub.internal.ack_set_tracker import AckSetTracker
from google.cloud.pubsublite.cloudpubsub.message_transformer import MessageTransformer
from google.cloud.pubsublite.cloudpubsub.nack_handler import NackHandler
from google.cloud.pubsublite.cloudpubsub.subscriber import AsyncSubscriber
from google.cloud.pubsublite.internal.wire.permanent_failable import PermanentFailable
from google.cloud.pubsublite.internal.wire.subscriber import Subscriber
from google.cloud.pubsublite_v1 import FlowControlRequest, SequencedMessage
from google.cloud.pubsub_v1.subscriber._protocol import requests


class _SizedMessage(NamedTuple):
message: PubsubMessage
size_bytes: int


class SinglePartitionSubscriber(PermanentFailable, AsyncSubscriber):
_underlying: Subscriber
_flow_control_settings: FlowControlSettings
_ack_set_tracker: AckSetTracker
_nack_handler: NackHandler
_transformer: MessageTransformer

_queue: queue.Queue
_messages_by_offset: Dict[int, _SizedMessage]
_looper_future: asyncio.Future

def __init__(self, underlying: Subscriber, flow_control_settings: FlowControlSettings, ack_set_tracker: AckSetTracker,
nack_handler: NackHandler, transformer: MessageTransformer):
super().__init__()
self._underlying = underlying
self._flow_control_settings = flow_control_settings
self._ack_set_tracker = ack_set_tracker
self._nack_handler = nack_handler
self._transformer = transformer

self._queue = queue.Queue()
self._messages_by_offset = {}

async def read(self) -> Message:
message: SequencedMessage = await self.await_unless_failed(self._underlying.read())
try:
cps_message = self._transformer.transform(message)
offset = message.cursor.offset
self._ack_set_tracker.track(offset)
self._messages_by_offset[offset] = _SizedMessage(cps_message, message.size_bytes)
wrapped_message = Message(cps_message._pb, ack_id=str(offset), delivery_attempt=0, request_queue=self._queue)
return wrapped_message
except GoogleAPICallError as e:
self.fail(e)
raise e

async def _handle_ack(self, message: requests.AckRequest):
offset = int(message.ack_id)
await self._underlying.allow_flow(
FlowControlRequest(allowed_messages=1, allowed_bytes=self._messages_by_offset[offset].size_bytes))
del self._messages_by_offset[offset]
try:
await self._ack_set_tracker.ack(offset)
except GoogleAPICallError as e:
self.fail(e)

def _handle_nack(self, message: requests.NackRequest):
offset = int(message.ack_id)
sized_message = self._messages_by_offset[offset]
try:
self._nack_handler.on_nack(sized_message.message,
lambda: self._queue.put(requests.AckRequest(
ack_id=message.ack_id,
byte_size=0, # Ignored
time_to_ack=0, # Ignored
ordering_key="" # Ignored
)))
except GoogleAPICallError as e:
self.fail(e)

async def _handle_queue_message(self, message: Union[
requests.AckRequest, requests.DropRequest, requests.ModAckRequest, requests.NackRequest]):
if isinstance(message, requests.DropRequest) or isinstance(message, requests.ModAckRequest):
self.fail(FailedPrecondition("Called internal method of google.cloud.pubsub_v1.subscriber.message.Message "
f"Pub/Sub Lite does not support: {message}"))
elif isinstance(message, requests.AckRequest):
await self._handle_ack(message)
else:
self._handle_nack(message)

async def _looper(self):
while True:
try:
queue_message = self._queue.get_nowait()
await self._handle_queue_message(queue_message)
except queue.Empty:
await asyncio.sleep(.1)

async def __aenter__(self):
await self._ack_set_tracker.__aenter__()
await self._underlying.__aenter__()
self._looper_future = asyncio.ensure_future(self._looper())
await self._underlying.allow_flow(FlowControlRequest(
allowed_messages=self._flow_control_settings.messages_outstanding,
allowed_bytes=self._flow_control_settings.bytes_outstanding))
return self

async def __aexit__(self, exc_type, exc_value, traceback):
self._looper_future.cancel()
try:
await self._looper_future
except asyncio.CancelledError:
pass
await self._underlying.__aexit__(exc_type, exc_value, traceback)
await self._ack_set_tracker.__aexit__(exc_type, exc_value, traceback)
66 changes: 66 additions & 0 deletions google/cloud/pubsublite/cloudpubsub/make_publisher.py
@@ -0,0 +1,66 @@
from typing import Optional, Mapping

from google.api_core.client_options import ClientOptions
from google.auth.credentials import Credentials

from google.cloud.pubsublite.cloudpubsub.internal.async_publisher_impl import AsyncPublisherImpl
from google.cloud.pubsublite.cloudpubsub.internal.publisher_impl import PublisherImpl
from google.cloud.pubsublite.cloudpubsub.publisher import AsyncPublisher, Publisher
from google.cloud.pubsublite.internal.wire.make_publisher import make_publisher as make_wire_publisher
from google.cloud.pubsublite.internal.wire.merge_metadata import merge_metadata
from google.cloud.pubsublite.internal.wire.pubsub_context import pubsub_context
from google.cloud.pubsublite.paths import TopicPath


def make_async_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> AsyncPublisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new AsyncPublisher.

Throws:
GoogleApiCallException on any error determining topic structure.
"""
metadata = merge_metadata(pubsub_context(framework="CLOUD_PUBSUB_SHIM"), metadata)
underlying = make_wire_publisher(topic, batching_delay_secs, credentials, client_options, metadata)
return AsyncPublisherImpl(underlying)


def make_publisher(
topic: TopicPath,
batching_delay_secs: Optional[float] = None,
credentials: Optional[Credentials] = None,
client_options: Optional[ClientOptions] = None,
metadata: Optional[Mapping[str, str]] = None
) -> Publisher:
"""
Make a new publisher for the given topic.

Args:
topic: The topic to publish to.
batching_delay_secs: The delay in seconds to batch messages. The default is reasonable for most cases.
credentials: The credentials to use to connect. GOOGLE_DEFAULT_CREDENTIALS is used if None.
client_options: Other options to pass to the client. Note that if you pass any you must set api_endpoint.
metadata: Additional metadata to send with the RPC.

Returns:
A new Publisher.

Throws:
GoogleApiCallException on any error determining topic structure.
"""
return PublisherImpl(make_async_publisher(topic, batching_delay_secs, credentials, client_options, metadata))