Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pubsub): ordering keys #26

Merged
merged 6 commits into from Feb 5, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
32 changes: 25 additions & 7 deletions google/cloud/pubsub_v1/publisher/_batch/base.py
Expand Up @@ -15,6 +15,7 @@
from __future__ import absolute_import

import abc
import enum

import six

Expand Down Expand Up @@ -134,6 +135,18 @@ def will_accept(self, message):
# Okay, everything is good.
return True

def cancel(self, cancellation_reason):
"""Complete pending futures with an exception.

This method must be called before publishing starts (ie: while the
batch is still accepting messages.)

Args:
cancellation_reason (BatchCancellationReason): The reason why this
batch has been cancelled.
"""
raise NotImplementedError

@abc.abstractmethod
def publish(self, message):
"""Publish a single message.
Expand All @@ -154,16 +167,21 @@ def publish(self, message):
raise NotImplementedError


class BatchStatus(object):
"""An enum-like class representing valid statuses for a batch.

It is acceptable for a class to use a status that is not on this
class; this represents the list of statuses where the existing
library hooks in functionality.
"""
class BatchStatus(str, enum.Enum):
"""An enum-like class representing valid statuses for a batch."""

ACCEPTING_MESSAGES = "accepting messages"
STARTING = "starting"
IN_PROGRESS = "in progress"
ERROR = "error"
SUCCESS = "success"


class BatchCancellationReason(str, enum.Enum):
"""An enum-like class representing reasons why a batch was cancelled."""

PRIOR_ORDERED_MESSAGE_FAILED = (
"Batch cancelled because prior ordered message for the same key has "
"failed. This batch has been cancelled to avoid out-of-order publish."
)
CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."
96 changes: 64 additions & 32 deletions google/cloud/pubsub_v1/publisher/_batch/thread.py
Expand Up @@ -62,15 +62,23 @@ class Batch(base.Batch):
settings (~.pubsub_v1.types.BatchSettings): The settings for batch
publishing. These should be considered immutable once the batch
has been opened.
autocommit (bool): Whether to autocommit the batch when the time
has elapsed. Defaults to True unless ``settings.max_latency`` is
inf.
batch_done_callback (Callable[[bool], Any]): Callback called when the
response for a batch publish has been received. Called with one
boolean argument: successfully published or a permanent error
occurred. Temporary errors are not surfaced because they are retried
at a lower level.
commit_when_full (bool): Whether to commit the batch when the batch
is full.
"""

def __init__(self, client, topic, settings, autocommit=True):
def __init__(
self, client, topic, settings, batch_done_callback=None, commit_when_full=True
):
self._client = client
self._topic = topic
self._settings = settings
self._batch_done_callback = batch_done_callback
self._commit_when_full = commit_when_full

self._state_lock = threading.Lock()
# These members are all communicated between threads; ensure that
Expand All @@ -87,15 +95,6 @@ def __init__(self, client, topic, settings, autocommit=True):
self._base_request_size = types.PublishRequest(topic=topic).ByteSize()
self._size = self._base_request_size

# If max latency is specified, start a thread to monitor the batch and
# commit when the max latency is reached.
self._thread = None
if autocommit and self.settings.max_latency < float("inf"):
self._thread = threading.Thread(
name="Thread-MonitorBatchPublisher", target=self.monitor
)
self._thread.start()

@staticmethod
def make_lock():
"""Return a threading lock.
Expand Down Expand Up @@ -148,6 +147,27 @@ def status(self):
"""
return self._status

def cancel(self, cancellation_reason):
"""Complete pending futures with an exception.

This method must be called before publishing starts (ie: while the
batch is still accepting messages.)

Args:
cancellation_reason (BatchCancellationReason): The reason why this
batch has been cancelled.
"""

with self._state_lock:
assert (
self._status == base.BatchStatus.ACCEPTING_MESSAGES
), "Cancel should not be called after sending has started."

exc = RuntimeError(cancellation_reason.value)
for future in self._futures:
future.set_exception(exc)
self._status = base.BatchStatus.ERROR

def commit(self):
"""Actually publish all of the messages on the active batch.

Expand All @@ -162,6 +182,7 @@ def commit(self):
If the current batch is **not** accepting messages, this method
does nothing.
"""

# Set the status to "starting" synchronously, to ensure that
# this batch will necessarily not accept new messages.
with self._state_lock:
Expand All @@ -170,7 +191,11 @@ def commit(self):
else:
return

# Start a new thread to actually handle the commit.
self._start_commit_thread()

def _start_commit_thread(self):
"""Start a new thread to actually handle the commit."""

commit_thread = threading.Thread(
name="Thread-CommitBatchPublisher", target=self._commit
)
Expand All @@ -195,7 +220,10 @@ def _commit(self):
# If, in the intervening period between when this method was
# called and now, the batch started to be committed, or
# completed a commit, then no-op at this point.
_LOGGER.debug("Batch is already in progress, exiting commit")
_LOGGER.debug(
"Batch is already in progress or has been cancelled, "
"exiting commit"
)
return

# Once in the IN_PROGRESS state, no other thread can publish additional
Expand All @@ -215,16 +243,24 @@ def _commit(self):
# Log how long the underlying request takes.
start = time.time()

batch_transport_succeeded = True
try:
# Performs retries for errors defined in retry_codes.publish in the
# publisher_client_config.py file.
response = self._client.api.publish(self._topic, self._messages)
except google.api_core.exceptions.GoogleAPIError as exc:
# We failed to publish, set the exception on all futures and
# exit.
# We failed to publish, even after retries, so set the exception on
# all futures and exit.
self._status = base.BatchStatus.ERROR

for future in self._futures:
future.set_exception(exc)

batch_transport_succeeded = False
if self._batch_done_callback is not None:
# Failed to publish batch.
self._batch_done_callback(batch_transport_succeeded)

_LOGGER.exception("Failed to publish %s messages.", len(self._futures))
return

Expand All @@ -250,26 +286,17 @@ def _commit(self):
for future in self._futures:
future.set_exception(exception)

# Unknown error -> batch failed to be correctly transported/
batch_transport_succeeded = False

_LOGGER.error(
"Only %s of %s messages were published.",
len(response.message_ids),
len(self._futures),
)

def monitor(self):
"""Commit this batch after sufficient time has elapsed.

This simply sleeps for ``self.settings.max_latency`` seconds,
and then calls commit unless the batch has already been committed.
"""
# NOTE: This blocks; it is up to the calling code to call it
# in a separate thread.

# Sleep for however long we should be waiting.
time.sleep(self.settings.max_latency)

_LOGGER.debug("Monitor is waking up")
return self._commit()
if self._batch_done_callback is not None:
self._batch_done_callback(batch_transport_succeeded)

def publish(self, message):
"""Publish a single message.
Expand All @@ -294,13 +321,18 @@ def publish(self, message):
pubsub_v1.publisher.exceptions.MessageTooLargeError: If publishing
the ``message`` would exceed the max size limit on the backend.
"""

# Coerce the type, just in case.
if not isinstance(message, types.PubsubMessage):
message = types.PubsubMessage(**message)

future = None

with self._state_lock:
assert (
self._status != base.BatchStatus.ERROR
), "Publish after stop() or publish error."

if not self.will_accept(message):
return future

Expand Down Expand Up @@ -333,7 +365,7 @@ def publish(self, message):

# Try to commit, but it must be **without** the lock held, since
# ``commit()`` will try to obtain the lock.
if overflow:
if self._commit_when_full and overflow:
self.commit()

return future
Empty file.
70 changes: 70 additions & 0 deletions google/cloud/pubsub_v1/publisher/_sequencer/base.py
@@ -0,0 +1,70 @@
# Copyright 2019, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import absolute_import

import abc

import six


@six.add_metaclass(abc.ABCMeta)
class Sequencer(object):
"""The base class for sequencers for Pub/Sub publishing. A sequencer
sequences messages to be published.
"""

@staticmethod
@abc.abstractmethod
def is_finished(self):
""" Whether the sequencer is finished and should be cleaned up.

Returns:
bool: Whether the sequencer is finished and should be cleaned up.
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def unpause(self, message):
""" Unpauses this sequencer.

Raises:
RuntimeError:
If called when the sequencer has not been paused.
"""
raise NotImplementedError

@staticmethod
@abc.abstractmethod
def publish(self, message):
""" Publish message for this ordering key.

Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.

Returns:
A class instance that conforms to Python Standard library's
:class:`~concurrent.futures.Future` interface (but not an
instance of that class). The future might return immediately with a
`pubsub_v1.publisher.exceptions.PublishToPausedOrderingKeyException`
if the ordering key is paused. Otherwise, the future tracks the
lifetime of the message publish.

Raises:
RuntimeError:
If called after this sequencer has been stopped, either by
a call to stop() or after all batches have been published.
"""
raise NotImplementedError