Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resolve Incompatibility with API Core >= 1.17.0 #93

Closed
crwilcox opened this issue Apr 29, 2020 · 7 comments · Fixed by #103
Closed

Resolve Incompatibility with API Core >= 1.17.0 #93

crwilcox opened this issue Apr 29, 2020 · 7 comments · Fixed by #103
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@crwilcox
Copy link
Contributor

Changes in 1.17.0 are causing an incompatibility with pubsub.

Related Issues:

googleapis/python-api-core#25
#74

Suspected changes that caused the behavior change:

googleapis/python-api-core@2b103b6#diff-d97e81006eaaf29e33270a85aead6aafR146-R148

Repro:


from google.cloud import pubsub_v1
import threading
from time import sleep

import logging

def main():
    logging.basicConfig(level=logging.DEBUG)
    logger = logging.getLogger('sample')
    logger.setLevel(logging.DEBUG)
    PROJECT_ID = "crwilcox-test"
    TOPIC_PATH = "my-topic"
    SUBSCRIPTION_PATH = "my-sub"

    publisher_thread = threading.Thread(target=publisher, args=(PROJECT_ID, TOPIC_PATH))
    publisher_thread.daemon = True
    publisher_thread.start()

    subscriber(PROJECT_ID, SUBSCRIPTION_PATH)

def publisher(
    project_id, topic_name
):
    # TODO project_id = "Your Google Cloud Project ID"
    # TODO topic_name = "Your Pub/Sub topic name"


    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_name}`
    topic_path = publisher.topic_path(project_id, topic_name)
    #publisher.create_topic(topic_path)

    for n in range(1, 100000):
        data = u"Message number {}".format(n)
        # Data must be a bytestring
        data = data.encode("utf-8")
        # When you publish a message, the client returns a future.
        future = publisher.publish(topic_path, data=data)
        print(future.result())
        sleep(5)

    print("Published messages.")



def subscriber(
    project_id, subscription_name, timeout=None
):
    # TODO project_id = "Your Google Cloud Project ID"
    # TODO subscription_name = "Your Pub/Sub subscription name"
    # TODO timeout = 5.0  # "How long the subscriber should listen for
    # messages in seconds"

    subscriber = pubsub_v1.SubscriberClient()
    # The `subscription_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/subscriptions/{subscription_name}`
    subscription_path = subscriber.subscription_path(
        project_id, subscription_name
    )

    def callback(message):
        print("Received message: {}".format(message.data))
        message.ack()

    streaming_pull_future = subscriber.subscribe(
        subscription_path, callback=callback
    )
    print("Listening for messages on {}..\n".format(subscription_path))

    # Wrap subscriber in a 'with' block to automatically call close() when done.
    with subscriber:
        try:
            # When `timeout` is not set, result() will block indefinitely,
            # unless an exception is encountered first.
            streaming_pull_future.result(timeout=timeout)
        except:  # noqa
            streaming_pull_future.cancel()


main()

Result:

Eventually, subscription will fail and stop processing.

The failure exists in bidi on getting the next item via __iter__

DEBUG:google.cloud.pubsub_v1.publisher._batch.thread:gRPC Publish took 0.034303903579711914 seconds.
1162512773129262
DEBUG:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Sent request(s) over unary RPC.
DEBUG:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Released held message, scheduling callback for it, still on hold 0 (bytes 0).
DEBUG:google.api_core.bidi:waiting for recv.
Received message: b'Message number 117'
DEBUG:google.cloud.pubsub_v1.subscriber._protocol.dispatcher:Handling 1 batched requests
DEBUG:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Sent request(s) over unary RPC.
iter except: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        details = "Deadline Exceeded"
        debug_error_string = "{"created":"@1588195797.686297000","description":"Error received from peer ipv4:216.58.217.42:443","file":"src/core/lib/surface/call.cc","file_line":1056,"grpc_message":"Deadline Exceeded","grpc_status":4}"
>
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed non-terminating stream error 504 Deadline Exceeded
INFO:google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager:Observed recoverable stream error 504 Deadline Exceeded
DEBUG:google.api_core.bidi:Re-opening stream from gRPC callback.
DEBUG:google.api_core.bidi:Empty queue and inactive call, exiting request generator.
DEBUG:google.cloud.pubsub_v1.subscriber._protocol.leaser:The current deadline value is 10 seconds.
PYTHON_PATH/lib/python3.8/threading.py(890)_bootstrap()
-> self._bootstrap_inner()
  PYTHON_PATH/lib/python3.8/threading.py(932)_bootstrap_inner()
-> self.run()
  PYTHON_PATH/lib/python3.8/threading.py(870)run()
-> self._target(*self._args, **self._kwargs)
  /workspace/googleapis/python-pubsub/venv/lib/python3.8/site-packages/google/api_core/bidi.py(655)_thread_main()
-> response = self._bidi_rpc.recv()
  /workspace/googleapis/python-pubsub/venv/lib/python3.8/site-packages/google/api_core/bidi.py(562)recv()
-> return self._recoverable(self._recv)
  /workspace/googleapis/python-pubsub/venv/lib/python3.8/site-packages/google/api_core/bidi.py(505)_recoverable()
-> return method(*args, **kwargs)
  /workspace/googleapis/python-pubsub/venv/lib/python3.8/site-packages/google/api_core/bidi.py(559)_recv()
-> return next(call)
> /workspace/googleapis/python-pubsub/venv/lib/python3.8/site-packages/google/api_core/grpc_helpers.py(109)next()
-> six.raise_from(exceptions.from_grpc_error(exc), exc)
@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Apr 29, 2020
@plamut plamut added priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. labels Apr 30, 2020
@plamut plamut self-assigned this Apr 30, 2020
@plamut
Copy link
Contributor

plamut commented Apr 30, 2020

I'll treat this as a bug, since Firestore works with the new API Core just fine.

@plamut
Copy link
Contributor

plamut commented May 15, 2020

@crwilcox When running the script, how long is "eventually" in a typical case? Minutes? Hours? Was the issue consistently reproducible?

For some reason the reproduction is much more difficult this time, maybe because the topic I used back then had much more messages posted or something, trying various things ATM ...

Edit: OK, finally reproduced, it took less than 15 minutes in this final run.

@plamut
Copy link
Contributor

plamut commented May 18, 2020

A few notes:

  • The issue always happens at the same message - it's after ~10 minutes have elapsed. This time can be greatly reduced by lowering the streaming_messaging timeout settings in subscriber_client.config.py.
  • When a stream is broken and an error happens while the consumer thread tries to obtain the next message, the _recoverable() wrapper kicks in, but then the thread blocks when it tries to acquire the lock.
    Additionally, that also blocks the heartbeater thread the next time it tries to send a hearbeat.
  • Commenting out the line in _StreamingResponseIterator that stores the initial result in constructor makes the locking issue disappear for some not yet obvious reason.

@plamut
Copy link
Contributor

plamut commented May 19, 2020

@crwilcox I think I've figured it out. When a DeadlineExceeded error occurs (after 10 minutes by default), both the background consumer thread and one of the gRPC threads detect this and try to recover the stream:

INFO     [2020-05-19 11:56:34,722] Thread-1                          [google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager] [streaming_pull_manager.py:659][_should_recover] Observed recoverable stream error 504 Deadline Exceeded
DEBUG    [2020-05-19 11:56:34,722] Thread-ConsumeBidirectionalStream [google.api_core.grpc_helpers] [grpc_helpers.py:118][next]  <google.api_core.grpc_helpers._StreamingResponseIterator object at 0x7f5f5472b400>: RpcError when trying to obtain next() item from <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        ...
DEBUG    [2020-05-19 11:56:34,723] Thread-1                          [google.api_core.bidi] [bidi.py:458][_on_call_done]  Re-opening stream from gRPC callback.
...
DEBUG    [2020-05-19 11:56:34,724] Thread-ConsumeBidirectionalStream [google.api_core.bidi] [bidi.py:537][_recoverable] Re-opening stream from retryable <bound method ResumableBidiRpc._recv of <google.api_core.bidi.ResumableBidiRpc object at 0x7f5f567df828>>.

The gRPC thread calls ResumableBidiRpc._reopen() and acquires _operational_lock. Right after it the BiDi consumer thread (which was blocked in _recv()) enters the except block in ResumableBidiRpc._recoverable(), tries to acquire the same _operational_lock and blocks.

The gRPC thread then calls self._start_rpc() (==subscriber_client.streaming_pull) in BidiRpc.open(), which wraps the result of the gRPC call into a new _StreamingResponseIterator instance.

As of api-core 1.17.0, _StreamingResponseIterator tries to retrieve the first result in __init__(), but that blocks the gRPC thread while the latter is still holding the lock, consequently blocking the BiDi consumer thread as well.

block

Is there a way to fix that Firestore bug without blocking on instance creation? Otherwise we might need to overhaul the BiDi stream reopening / locking logic, which would come with its own risks.

@crwilcox
Copy link
Contributor Author

@plamut and I chatted quick. The plan is to add a flag so that pubsub can elect not to start streaming from init.

I have some concern that the way Pub/Sub and Firestore are currently initializing and resuming isn't compatible though and we may want to bring them into alignment with one another...

googleapis/google-cloud-python#10206

@meredithslota
Copy link
Contributor

@crwilcox Agreed - are you the right person to work with re: Firestore stuff?

@plamut
Copy link
Contributor

plamut commented May 22, 2020

@meredithslota We discussed and decided that the api-core default behavior will suit Firestore (pre-fetching by default), thus no changes will be needed there. The flag will be unset on the PubSub's side.

It could have been the other way around, of course, but it just happened that I had more capacity and already had all the PubSub context, thus the change can be made quicker this way.

P.S.: Unless you were referring to refactoring the general stream recovery logic, that's a different beast...

gcf-merge-on-green bot pushed a commit to googleapis/python-api-core that referenced this issue Jun 9, 2020
Closes #25.

This PR adds the ability to disable automatically pre-fetching the first item of a stream returned by `*-Stream` gRPC callables. This hook will be used in PubSub to fix the [stalled stream issue](googleapis/python-pubsub#93), while also not affecting Firestore, since the default behavior is preserved.

I realize the fix is far from ideal, but it's the least ugly among the approaches I tried, e.g. somehow passing the flag through `ResumableBidiRpc` (it's a messy rabbit hole). On the PubSub side monkeypatching the generated SubscriberClient will be needed, but it's a (relatively) clean one-liner:
```patch
diff --git google/cloud/pubsub_v1/gapic/subscriber_client.py google/cloud/pubsub_v1/gapic/subscriber_client.py
index e98a686..1d6c058 100644
--- google/cloud/pubsub_v1/gapic/subscriber_client.py
+++ google/cloud/pubsub_v1/gapic/subscriber_client.py
@@ -1169,6 +1169,8 @@ class SubscriberClient(object):
                 default_timeout=self._method_configs["StreamingPull"].timeout,
                 client_info=self._client_info,
             )
+            # TODO: explain this monkeypatch!
+            self.transport.streaming_pull._prefetch_first_result_ = False
 
         return self._inner_api_calls["streaming_pull"](
             requests, retry=retry, timeout=timeout, metadata=metadata
```

If/when we merge this, we should also release it, and then we can add `!= 1.17.0` to the `google-api-core` version pin in PubSub.

### PR checklist
- [x] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-api-core/issues/new/choose) before writing your code!  That way we can discuss the change, evaluate designs, and agree on the general idea
- [x] Ensure the tests and linter pass
- [x] Code coverage does not decrease (if any source code was changed)
- [x] Appropriate docs were updated (if necessary)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants