diff --git a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py index f3798c056..7a08a6388 100644 --- a/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py +++ b/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py @@ -542,6 +542,13 @@ def _on_response(self, response): After the messages have all had their ack deadline updated, execute the callback for each message using the executor. """ + if response is None: + _LOGGER.debug( + "Response callback invoked with None, likely due to a " + "transport shutdown." + ) + return + _LOGGER.debug( "Processing %s received message(s), currenty on hold %s (bytes %s).", len(response.received_messages), diff --git a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py index 1732ec6cd..8bb53f150 100644 --- a/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py +++ b/tests/unit/pubsub_v1/subscriber/test_streaming_pull_manager.py @@ -721,6 +721,21 @@ def test__on_response_with_leaser_overload(): assert msg.message_id in ("2", "3") +def test__on_response_none_data(caplog): + caplog.set_level(logging.DEBUG) + + manager, _, dispatcher, leaser, _, scheduler = make_running_manager() + manager._callback = mock.sentinel.callback + + # adjust message bookkeeping in leaser + fake_leaser_add(leaser, init_msg_count=0, assumed_msg_size=10) + + manager._on_response(response=None) + + scheduler.schedule.assert_not_called() + assert "callback invoked with None" in caplog.text + + def test_retryable_stream_errors(): # Make sure the config matches our hard-coded tuple of exceptions. interfaces = subscriber_client_config.config["interfaces"]