diff --git a/google/cloud/pubsub_v1/gapic/subscriber_client.py b/google/cloud/pubsub_v1/gapic/subscriber_client.py index 400bdc3da..12c2a780d 100644 --- a/google/cloud/pubsub_v1/gapic/subscriber_client.py +++ b/google/cloud/pubsub_v1/gapic/subscriber_client.py @@ -1175,6 +1175,11 @@ def streaming_pull( client_info=self._client_info, ) + # Wrappers in api-core should not automatically pre-fetch the first + # stream result, as this breaks the stream when re-opening it. + # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 + self.transport.streaming_pull._prefetch_first_result_ = False + return self._inner_api_calls["streaming_pull"]( requests, retry=retry, timeout=timeout, metadata=metadata ) diff --git a/setup.py b/setup.py index 414e7620b..25ce7085b 100644 --- a/setup.py +++ b/setup.py @@ -29,10 +29,10 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - # google-api-core[grpc] 1.17.0 causes problems, thus restricting its - # version until the issue gets fixed. + # google-api-core[grpc] 1.17.0 up to 1.19.1 causes problems with stream + # recovery, thus those versions should not be used. # https://github.com/googleapis/python-pubsub/issues/74 - "google-api-core[grpc] >= 1.14.0, < 1.17.0", + "google-api-core[grpc] >= 1.14.0, != 1.17.*, != 1.18.*, != 1.19.*", "grpc-google-iam-v1 >= 0.12.3, < 0.13dev", 'enum34; python_version < "3.4"', ] diff --git a/synth.py b/synth.py index e9547c834..b44cc0acf 100644 --- a/synth.py +++ b/synth.py @@ -185,6 +185,20 @@ def _merge_dict(d1, d2): "from google.iam.v1 import iam_policy_pb2_grpc as iam_policy_pb2", ) +# Monkey patch the streaming_pull() GAPIC method to disable pre-fetching stream +# results. +s.replace( + "google/cloud/pubsub_v1/gapic/subscriber_client.py", + r"return self\._inner_api_calls\['streaming_pull'\]\(.*", + """ + # Wrappers in api-core should not automatically pre-fetch the first + # stream result, as this breaks the stream when re-opening it. + # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 + self.transport.streaming_pull._prefetch_first_result_ = False + + \g<0>""" +) + # Add missing blank line before Attributes: in generated docstrings # https://github.com/googleapis/protoc-docs-plugin/pull/31 s.replace( diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index d8f671157..310485279 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -154,3 +154,15 @@ def test_closes_channel_as_context_manager(): pass mock_transport.channel.close.assert_called() + + +def test_streaming_pull_gapic_monkeypatch(): + transport = mock.NonCallableMock(spec=["streaming_pull"]) + transport.streaming_pull = mock.Mock(spec=[]) + client = subscriber.Client(transport=transport) + + client.streaming_pull(requests=iter([])) + + assert client.api.transport is transport + assert hasattr(transport.streaming_pull, "_prefetch_first_result_") + assert not transport.streaming_pull._prefetch_first_result_