Skip to content

Commit

Permalink
Raise publish flow control errors through futures
Browse files Browse the repository at this point in the history
  • Loading branch information
plamut committed May 12, 2020
1 parent 2fe776f commit ac9af40
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 1 deletion.
9 changes: 8 additions & 1 deletion google/cloud/pubsub_v1/publisher/client.py
Expand Up @@ -31,6 +31,8 @@
from google.cloud.pubsub_v1 import types
from google.cloud.pubsub_v1.gapic import publisher_client
from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport
from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher import futures
from google.cloud.pubsub_v1.publisher._batch import thread
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer
from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer
Expand Down Expand Up @@ -379,7 +381,12 @@ def publish(self, topic, data, ordering_key="", **attrs):

# Messages should go through flow control to prevent excessive
# queuing on the client side (depending on the settings).
self._flow_controller.add(message)
try:
self._flow_controller.add(message)
except exceptions.FlowControlLimitError as exc:
future = futures.Future()
future.set_exception(exc)
return future

def on_publish_done(future):
self._flow_controller.release(message)
Expand Down
25 changes: 25 additions & 0 deletions tests/unit/pubsub_v1/publisher/test_publisher_client.py
Expand Up @@ -25,6 +25,7 @@
from google.cloud.pubsub_v1 import publisher
from google.cloud.pubsub_v1 import types

from google.cloud.pubsub_v1.publisher import exceptions
from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer


Expand Down Expand Up @@ -156,6 +157,30 @@ def test_publish():
)


def test_publish_error_exceeding_flow_control_limits():
creds = mock.Mock(spec=credentials.Credentials)
publisher_options = types.PublisherOptions(
flow_control=types.PublishFlowControl(
message_limit=10,
byte_limit=150,
limit_exceeded_behavior=types.LimitExceededBehavior.ERROR,
)
)
client = publisher.Client(credentials=creds, publisher_options=publisher_options)

mock_batch = mock.Mock(spec=client._batch_class)
mock_batch.will_accept.return_value = True
topic = "topic/path"
client._set_batch(topic, mock_batch)

future1 = client.publish(topic, b"a" * 100)
future2 = client.publish(topic, b"b" * 100)

future1.result() # no error, still within flow control limits
with pytest.raises(exceptions.FlowControlLimitError):
future2.result()


def test_publish_data_not_bytestring_error():
creds = mock.Mock(spec=credentials.Credentials)
client = publisher.Client(credentials=creds)
Expand Down

0 comments on commit ac9af40

Please sign in to comment.