/
base.py
187 lines (146 loc) · 6.27 KB
/
base.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
# Copyright 2017, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import abc
import enum
import six
@six.add_metaclass(abc.ABCMeta)
class Batch(object):
"""The base batching class for Pub/Sub publishing.
Although the :class:`~.pubsub_v1.publisher.batch.thread.Batch` class, based
on :class:`threading.Thread`, is fine for most cases, advanced
users may need to implement something based on a different concurrency
model.
This class defines the interface for the Batch implementation;
subclasses may be passed as the ``batch_class`` argument to
:class:`~.pubsub_v1.client.PublisherClient`.
The batching behavior works like this: When the
:class:`~.pubsub_v1.publisher.client.Client` is asked to publish a new
message, it requires a batch. The client will see if there is an
already-opened batch for the given topic; if there is, then the message
is sent to that batch. If there is not, then a new batch is created
and the message put there.
When a new batch is created, it automatically starts a timer counting
down to the maximum latency before the batch should commit.
Essentially, if enough time passes, the batch automatically commits
regardless of how much is in it. However, if either the message count or
size thresholds are encountered first, then the batch will commit early.
"""
def __len__(self):
"""Return the number of messages currently in the batch."""
return len(self.messages)
@staticmethod
@abc.abstractmethod
def make_lock():
"""Return a lock in the chosen concurrency model.
Returns:
ContextManager: A newly created lock.
"""
raise NotImplementedError
@property
@abc.abstractmethod
def messages(self):
"""Return the messages currently in the batch.
Returns:
Sequence: The messages currently in the batch.
"""
raise NotImplementedError
@property
@abc.abstractmethod
def size(self):
"""Return the total size of all of the messages currently in the batch.
The size includes any overhead of the actual ``PublishRequest`` that is
sent to the backend.
Returns:
int: The total size of all of the messages currently
in the batch (including the request overhead), in bytes.
"""
raise NotImplementedError
@property
@abc.abstractmethod
def settings(self):
"""Return the batch settings.
Returns:
~.pubsub_v1.types.BatchSettings: The batch settings. These are
considered immutable once the batch has been opened.
"""
raise NotImplementedError
@property
@abc.abstractmethod
def status(self):
"""Return the status of this batch.
Returns:
str: The status of this batch. All statuses are human-readable,
all-lowercase strings. The ones represented in the
:class:`BaseBatch.Status` enum are special, but other statuses
are permitted.
"""
raise NotImplementedError
def will_accept(self, message):
"""Return True if the batch is able to accept the message.
In concurrent implementations, the attributes on the current batch
may be modified by other workers. With this in mind, the caller will
likely want to hold a lock that will make sure the state remains
the same after the "will accept?" question is answered.
Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
Returns:
bool: Whether this batch can accept the message.
"""
# If this batch is not accepting messages generally, return False.
if self.status != BatchStatus.ACCEPTING_MESSAGES:
return False
# If this message will make the batch exceed the ``max_messages``
# setting, return False.
if len(self.messages) >= self.settings.max_messages:
return False
# Okay, everything is good.
return True
def cancel(self, cancellation_reason):
"""Complete pending futures with an exception.
This method must be called before publishing starts (ie: while the
batch is still accepting messages.)
Args:
cancellation_reason (BatchCancellationReason): The reason why this
batch has been cancelled.
"""
raise NotImplementedError
@abc.abstractmethod
def publish(self, message):
"""Publish a single message.
Add the given message to this object; this will cause it to be
published once the batch either has enough messages or a sufficient
period of time has elapsed.
This method is called by :meth:`~.PublisherClient.publish`.
Args:
message (~.pubsub_v1.types.PubsubMessage): The Pub/Sub message.
Returns:
~google.api_core.future.Future: An object conforming to the
:class:`concurrent.futures.Future` interface.
"""
raise NotImplementedError
class BatchStatus(str, enum.Enum):
"""An enum-like class representing valid statuses for a batch."""
ACCEPTING_MESSAGES = "accepting messages"
STARTING = "starting"
IN_PROGRESS = "in progress"
ERROR = "error"
SUCCESS = "success"
class BatchCancellationReason(str, enum.Enum):
"""An enum-like class representing reasons why a batch was cancelled."""
PRIOR_ORDERED_MESSAGE_FAILED = (
"Batch cancelled because prior ordered message for the same key has "
"failed. This batch has been cancelled to avoid out-of-order publish."
)
CLIENT_STOPPED = "Batch cancelled because the publisher client has been stopped."