diff --git a/google/cloud/pubsub_v1/publisher/client.py b/google/cloud/pubsub_v1/publisher/client.py index 5ab7ce4d4..d97c0d11d 100644 --- a/google/cloud/pubsub_v1/publisher/client.py +++ b/google/cloud/pubsub_v1/publisher/client.py @@ -31,6 +31,8 @@ from google.cloud.pubsub_v1 import types from google.cloud.pubsub_v1.gapic import publisher_client from google.cloud.pubsub_v1.gapic.transports import publisher_grpc_transport +from google.cloud.pubsub_v1.publisher import exceptions +from google.cloud.pubsub_v1.publisher import futures from google.cloud.pubsub_v1.publisher._batch import thread from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer from google.cloud.pubsub_v1.publisher._sequencer import unordered_sequencer @@ -379,7 +381,12 @@ def publish(self, topic, data, ordering_key="", **attrs): # Messages should go through flow control to prevent excessive # queuing on the client side (depending on the settings). - self._flow_controller.add(message) + try: + self._flow_controller.add(message) + except exceptions.FlowControlLimitError as exc: + future = futures.Future() + future.set_exception(exc) + return future def on_publish_done(future): self._flow_controller.release(message) diff --git a/tests/unit/pubsub_v1/publisher/test_publisher_client.py b/tests/unit/pubsub_v1/publisher/test_publisher_client.py index 2d345821c..4e3a3870f 100644 --- a/tests/unit/pubsub_v1/publisher/test_publisher_client.py +++ b/tests/unit/pubsub_v1/publisher/test_publisher_client.py @@ -25,6 +25,7 @@ from google.cloud.pubsub_v1 import publisher from google.cloud.pubsub_v1 import types +from google.cloud.pubsub_v1.publisher import exceptions from google.cloud.pubsub_v1.publisher._sequencer import ordered_sequencer @@ -156,6 +157,30 @@ def test_publish(): ) +def test_publish_error_exceeding_flow_control_limits(): + creds = mock.Mock(spec=credentials.Credentials) + publisher_options = types.PublisherOptions( + flow_control=types.PublishFlowControl( + message_limit=10, + byte_limit=150, + limit_exceeded_behavior=types.LimitExceededBehavior.ERROR, + ) + ) + client = publisher.Client(credentials=creds, publisher_options=publisher_options) + + mock_batch = mock.Mock(spec=client._batch_class) + mock_batch.will_accept.return_value = True + topic = "topic/path" + client._set_batch(topic, mock_batch) + + future1 = client.publish(topic, b"a" * 100) + future2 = client.publish(topic, b"b" * 100) + + future1.result() # no error, still within flow control limits + with pytest.raises(exceptions.FlowControlLimitError): + future2.result() + + def test_publish_data_not_bytestring_error(): creds = mock.Mock(spec=credentials.Credentials) client = publisher.Client(credentials=creds)