Skip to content

Commit

Permalink
feat(pubsub): ordering keys (#26)
Browse files Browse the repository at this point in the history
  • Loading branch information
pradn committed Feb 5, 2020
1 parent da96526 commit cc3093a
Show file tree
Hide file tree
Showing 25 changed files with 2,406 additions and 327 deletions.
32 changes: 25 additions & 7 deletions google/cloud/pubsub_v1/publisher/_batch/base.py
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import

import abc
import enum

import six

Expand Down Expand Up @@ -134,6 +135,18 @@ def will_accept(self, message):
# Okay, everything is good.
return True

def cancel(self, cancellation_reason):
"""Complete pending futures with an exception.
This method must be called before publishing starts (ie: while the
batch is still accepting messages.)
Args:
cancellation_reason (BatchCancellationReason): The reason why this
batch has been cancelled.
"""
raise NotImplementedError

@abc.abstractmethod
def publish(self, message):
"""Publish a single message.
Expand All @@ -154,16 +167,21 @@ def publish(self, message):
raise NotImplementedError


class BatchStatus(object):
"""An enum-like class representing valid statuses for a batch.
It is acceptable for a class to use a status that is not on this
class; this represents the list of statuses where the existing
library hooks in functionality.
"""
class BatchStatus(str, enum.Enum):
"""An enum-like class representing valid statuses for a batch."""

ACCEPTING_MESSAGES = "accepting messages"
STARTING = "starting"
IN_PROGRESS = "in progress"
ERROR = "error"
SUCCESS = "success"


class BatchCancellationReason(str, enum.Enum):
"""An enum-like class representing reasons why a batch was cancelled."""

PRIOR_ORDERED_MESSAGE_FAILED = (
"Batch cancelled because prior ordered message for the same key has "
"failed. This batch has been cancelled to avoid out-of-order publish."
)
CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."
96 changes: 64 additions & 32 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -62,15 +62,23 @@ class Batch(base.Batch):
settings (~.pubsub_v1.types.BatchSettings): The settings for batch
publishing. These should be considered immutable once the batch
has been opened.
autocommit (bool): Whether to autocommit the batch when the time
has elapsed. Defaults to True unless ``settings.max_latency`` is
inf.
batch_done_callback (Callable[[bool], Any]): Callback called when the
response for a batch publish has been received. Called with one
boolean argument: successfully published or a permanent error
occurred. Temporary errors are not surfaced because they are retried
at a lower level.
commit_when_full (bool): Whether to commit the batch when the batch
is full.
"""

def __init__(self, client, topic, settings, autocommit=True):
def __init__(
self, client, topic, settings, batch_done_callback=None, commit_when_full=True
):
self._client = client
self._topic = topic
self._settings = settings
self._batch_done_callback = batch_done_callback
self._commit_when_full = commit_when_full

self._state_lock = threading.Lock()
# These members are all communicated between threads; ensure that
Expand All @@ -87,15 +95,6 @@ def __init__(self, client, topic, settings, autocommit=True):
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
self._size = self._base_request_size

# If max latency is specified, start a thread to monitor the batch and
# commit when the max latency is reached.
self._thread = None
if autocommit and self.settings.max_latency < float("inf"):
self._thread = threading.Thread(
name="Thread-MonitorBatchPublisher", target=self.monitor
)
self._thread.start()

@staticmethod
def make_lock():
"""Return a threading lock.
Expand Down Expand Up @@ -148,6 +147,27 @@ def status(self):
"""
return self._status

def cancel(self, cancellation_reason):
"""Complete pending futures with an exception.
This method must be called before publishing starts (ie: while the
batch is still accepting messages.)
Args:
cancellation_reason (BatchCancellationReason): The reason why this
batch has been cancelled.
"""

with self._state_lock:
assert (
self._status == base.BatchStatus.ACCEPTING_MESSAGES
), "Cancel should not be called after sending has started."

exc = RuntimeError(cancellation_reason.value)
for future in self._futures:
future.set_exception(exc)
self._status = base.BatchStatus.ERROR

def commit(self):
"""Actually publish all of the messages on the active batch.
Expand All @@ -162,6 +182,7 @@ def commit(self):
If the current batch is **not** accepting messages, this method
does nothing.
"""

# Set the status to "starting" synchronously, to ensure that
# this batch will necessarily not accept new messages.
with self._state_lock:
Expand All @@ -170,7 +191,11 @@ def commit(self):
else:
return

# Start a new thread to actually handle the commit.
self._start_commit_thread()

def _start_commit_thread(self):
"""Start a new thread to actually handle the commit."""

commit_thread = threading.Thread(
name="Thread-CommitBatchPublisher", target=self._commit
)
Expand All @@ -195,7 +220,10 @@ def _commit(self):
# If, in the intervening period between when this method was
# called and now, the batch started to be committed, or
# completed a commit, then no-op at this point.
_LOGGER.debug("Batch is already in progress, exiting commit")
_LOGGER.debug(
"Batch is already in progress or has been cancelled, "
"exiting commit"
)
return

# Once in the IN_PROGRESS state, no other thread can publish additional
Expand All @@ -215,16 +243,24 @@ def _commit(self):
# Log how long the underlying request takes.
start = time.time()

batch_transport_succeeded = True
try:
# Performs retries for errors defined in retry_codes.publish in the
# publisher_client_config.py file.
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
# We failed to publish, even after retries, so set the exception on
# all futures and exit.
self._status = base.BatchStatus.ERROR

for future in self._futures:
future.set_exception(exc)

batch_transport_succeeded = False
if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return

Expand All @@ -250,26 +286,17 @@ def _commit(self):
for future in self._futures:
future.set_exception(exception)

# Unknown error -> batch failed to be correctly transported/
batch_transport_succeeded = False

_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)

def monitor(self):
"""Commit this batch after sufficient time has elapsed.
This simply sleeps for ``self.settings.max_latency`` seconds,
and then calls commit unless the batch has already been committed.
"""
# NOTE: This blocks; it is up to the calling code to call it
# in a separate thread.

# Sleep for however long we should be waiting.
time.sleep(self.settings.max_latency)

_LOGGER.debug("Monitor is waking up")
return self._commit()
if self._batch_done_callback is not None:
self._batch_done_callback(batch_transport_succeeded)

def publish(self, message):
"""Publish a single message.
Expand All @@ -294,13 +321,18 @@ def publish(self, message):
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
the ``message`` would exceed the max size limit on the backend.
"""

# Coerce the type, just in case.
if not isinstance(message, types.PubsubMessage):
message = types.PubsubMessage(**message)

future = None

with self._state_lock:
assert (
self._status != base.BatchStatus.ERROR
), "Publish after stop() or publish error."

if not self.will_accept(message):
return future

Expand Down Expand Up @@ -333,7 +365,7 @@ def publish(self, message):

# Try to commit, but it must be **without** the lock held, since
# ``commit()`` will try to obtain the lock.
if overflow:
if self._commit_when_full and overflow:
self.commit()

return future
Empty file.
70 changes: 70 additions & 0 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
@@ -0,0 +1,70 @@
# Copyright 2019, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import abc

import six


@six.add_metaclass(abc.ABCMeta)
class Sequencer(object):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
sequences messages to be published.
"""

@staticmethod
@abc.abstractmethod
def is_finished(self):
""" Whether the sequencer is finished and should be cleaned up.
Returns:
bool: Whether the sequencer is finished and should be cleaned up.
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def unpause(self, message):
""" Unpauses this sequencer.
Raises:
RuntimeError:
If called when the sequencer has not been paused.
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def publish(self, message):
""" Publish message for this ordering key.
Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
Returns:
A class instance that conforms to Python Standard library's
:class:`~concurrent.futures.Future` interface (but not an
instance of that class). The future might return immediately with a
`pubsub_v1.publisher.exceptions.PublishToPausedOrderingKeyException`
if the ordering key is paused. Otherwise, the future tracks the
lifetime of the message publish.
Raises:
RuntimeError:
If called after this sequencer has been stopped, either by
a call to stop() or after all batches have been published.
"""
raise NotImplementedError

0 comments on commit cc3093a

Please sign in to comment.