From c02060fbbe6e2ca4664bee08d2de10665d41dc0b Mon Sep 17 00:00:00 2001 From: Peter Lamut Date: Wed, 10 Jun 2020 00:24:56 +0200 Subject: [PATCH] fix: PubSub incompatibility with api-core 1.17.0+ (#103) * fix: disable pre-fetching first streaming pull item * Remove api-core version cap, but ban 1.17.0 release * Regenerate gapic layer with synth * Revert "Regenerate gapic layer with synth" This reverts commit 1d24853f1f255d207a9fcf30180290871235bca4. * Retain only the relevant fix in generated code * Exclude multiple incompatible api-core versions * Fix syntax error in synth.py * Ban all bugfix versions of problematic api-core minor versions --- google/cloud/pubsub_v1/gapic/subscriber_client.py | 5 +++++ setup.py | 6 +++--- synth.py | 14 ++++++++++++++ .../pubsub_v1/subscriber/test_subscriber_client.py | 12 ++++++++++++ 4 files changed, 34 insertions(+), 3 deletions(-) diff --git a/google/cloud/pubsub_v1/gapic/subscriber_client.py b/google/cloud/pubsub_v1/gapic/subscriber_client.py index 400bdc3da..12c2a780d 100644 --- a/google/cloud/pubsub_v1/gapic/subscriber_client.py +++ b/google/cloud/pubsub_v1/gapic/subscriber_client.py @@ -1175,6 +1175,11 @@ def streaming_pull( client_info=self._client_info, ) + # Wrappers in api-core should not automatically pre-fetch the first + # stream result, as this breaks the stream when re-opening it. + # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 + self.transport.streaming_pull._prefetch_first_result_ = False + return self._inner_api_calls["streaming_pull"]( requests, retry=retry, timeout=timeout, metadata=metadata ) diff --git a/setup.py b/setup.py index 414e7620b..25ce7085b 100644 --- a/setup.py +++ b/setup.py @@ -29,10 +29,10 @@ # 'Development Status :: 5 - Production/Stable' release_status = "Development Status :: 5 - Production/Stable" dependencies = [ - # google-api-core[grpc] 1.17.0 causes problems, thus restricting its - # version until the issue gets fixed. + # google-api-core[grpc] 1.17.0 up to 1.19.1 causes problems with stream + # recovery, thus those versions should not be used. # https://github.com/googleapis/python-pubsub/issues/74 - "google-api-core[grpc] >= 1.14.0, < 1.17.0", + "google-api-core[grpc] >= 1.14.0, != 1.17.*, != 1.18.*, != 1.19.*", "grpc-google-iam-v1 >= 0.12.3, < 0.13dev", 'enum34; python_version < "3.4"', ] diff --git a/synth.py b/synth.py index e9547c834..b44cc0acf 100644 --- a/synth.py +++ b/synth.py @@ -185,6 +185,20 @@ def _merge_dict(d1, d2): "from google.iam.v1 import iam_policy_pb2_grpc as iam_policy_pb2", ) +# Monkey patch the streaming_pull() GAPIC method to disable pre-fetching stream +# results. +s.replace( + "google/cloud/pubsub_v1/gapic/subscriber_client.py", + r"return self\._inner_api_calls\['streaming_pull'\]\(.*", + """ + # Wrappers in api-core should not automatically pre-fetch the first + # stream result, as this breaks the stream when re-opening it. + # https://github.com/googleapis/python-pubsub/issues/93#issuecomment-630762257 + self.transport.streaming_pull._prefetch_first_result_ = False + + \g<0>""" +) + # Add missing blank line before Attributes: in generated docstrings # https://github.com/googleapis/protoc-docs-plugin/pull/31 s.replace( diff --git a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py index d8f671157..310485279 100644 --- a/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py +++ b/tests/unit/pubsub_v1/subscriber/test_subscriber_client.py @@ -154,3 +154,15 @@ def test_closes_channel_as_context_manager(): pass mock_transport.channel.close.assert_called() + + +def test_streaming_pull_gapic_monkeypatch(): + transport = mock.NonCallableMock(spec=["streaming_pull"]) + transport.streaming_pull = mock.Mock(spec=[]) + client = subscriber.Client(transport=transport) + + client.streaming_pull(requests=iter([])) + + assert client.api.transport is transport + assert hasattr(transport.streaming_pull, "_prefetch_first_result_") + assert not transport.streaming_pull._prefetch_first_result_