/
flow_controller.py
145 lines (115 loc) · 5.1 KB
/
flow_controller.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
# Copyright 2020, Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
import warnings
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.publisher import exceptions
_LOGGER = logging.getLogger(__name__)
class FlowController(object):
"""A class used to control the flow of messages passing through it.
Args:
settings (~google.cloud.pubsub_v1.types.PublishFlowControl):
Desired flow control configuration.
"""
def __init__(self, settings):
self._settings = settings
self._message_count = 0
self._total_bytes = 0
# The lock is used to protect the internal state (message and byte count).
self._operational_lock = threading.Lock()
# The condition for blocking the flow if capacity is exceeded.
self._has_capacity = threading.Condition(lock=self._operational_lock)
def add(self, message):
"""Add a message to flow control.
Adding a message updates the internal load statistics, and an action is
taken if these limits are exceeded (depending on the flow control settings).
Args:
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
The message entering the flow control.
Raises:
:exception:`~pubsub_v1.publisher.exceptions.FlowControlLimitError`:
If adding a message exceeds flow control limits and the desired
action is :attr:`~google.cloud.pubsub_v1.types.LimitExceededBehavior.ERROR`.
"""
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
return
with self._operational_lock:
self._message_count += 1
self._total_bytes += message.ByteSize()
if not self._is_overflow():
return
# We have an overflow, react.
if (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.ERROR
):
msg = (
"Flow control limits exceeded "
"(messages: {} / {}, bytes: {} / {})."
).format(
self._message_count,
self._settings.message_limit,
self._total_bytes,
self._settings.byte_limit,
)
error = exceptions.FlowControlLimitError(msg)
# Raising an error means rejecting a message, thus we need to deduct
# the latter's contribution to the total load.
self._message_count -= 1
self._total_bytes -= message.ByteSize()
raise error
assert (
self._settings.limit_exceeded_behavior
== types.LimitExceededBehavior.BLOCK
)
while self._is_overflow():
_LOGGER.debug(
"Blocking until there is enough free capacity in the flow."
)
self._has_capacity.wait()
_LOGGER.debug("Woke up from waiting on free capacity in the flow.")
def release(self, message):
"""Release a message from flow control.
Args:
message (:class:`~google.cloud.pubsub_v1.types.PubsubMessage`):
The message entering the flow control.
"""
if self._settings.limit_exceeded_behavior == types.LimitExceededBehavior.IGNORE:
return
with self._operational_lock:
was_overflow = self._is_overflow()
self._message_count -= 1
self._total_bytes -= message.ByteSize()
if self._message_count < 0 or self._total_bytes < 0:
warnings.warn(
"Releasing a message that was never added or already released.",
category=RuntimeWarning,
stacklevel=2,
)
self._message_count = max(0, self._message_count)
self._total_bytes = max(0, self._total_bytes)
if was_overflow and not self._is_overflow():
_LOGGER.debug("Notifying threads waiting to add messages to flow.")
self._has_capacity.notify_all()
def _is_overflow(self):
"""Determine if the current message load exceeds flow control limits.
The method assumes that the caller has obtained ``_operational_lock``.
Returns:
bool
"""
return (
self._message_count > self._settings.message_limit
or self._total_bytes > self._settings.byte_limit
)