Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poor termination handling in bidi.BackgroundConsumer resulting in error logs #619

Open
dhendry opened this issue Feb 21, 2024 · 3 comments · May be fixed by #620
Open

Poor termination handling in bidi.BackgroundConsumer resulting in error logs #619

dhendry opened this issue Feb 21, 2024 · 3 comments · May be fixed by #620
Assignees
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@dhendry
Copy link

dhendry commented Feb 21, 2024

Overview

  • I have been trying to debug error logs that are popping up in our production infrastructure that I believe are
    caused by poor termination handling logic in the bidi.BackgroundConsumer class.
  • The issues we are seeing occur when using the BigQuery Storage Write API, however I dont believe the underlying issue
    has anything to do with the storage write client but instead is related to the bidi.BackgroundConsumer class.

Environment details

  • OS type and version: Mac 14.2.1 (also occurs within google app engine flexible)
  • Python version: 3.9.18
  • pip version: 23.3.1
  • google-api-core version: 2.17.1

Error 1: bidi.BackgroundConsumer error log on shutdown using very simple client server implementation

  • simple_api_core_repro.py: This script contains an extremely simple ping server and client
    call using the bidi.BidiRpc and bidi.BackgroundConsumer classes which results in an error being logged.
  • The error is generated every single time the client is shutdown.

Stack trace

14:38:29 - DEBUG - google.api_core.bidi - Thread-3 - Cleanly exiting request generator.
14:38:29 - ERROR - google.api_core.bidi - Thread-ConsumeBidirectionalStream - Thread-ConsumeBidirectionalStream caught unexpected exception <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.CANCELLED
	details = "Locally cancelled by application!"
	debug_error_string = "None"
> and will exit.
Traceback (most recent call last):
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 663, in _thread_main
    response = self._bidi_rpc.recv()
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 346, in recv
    return next(self.call)
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 540, in __next__
    return self._next()
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 966, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.CANCELLED
	details = "Locally cancelled by application!"
	debug_error_string = "None"
>
14:38:29 - INFO - google.api_core.bidi - Thread-ConsumeBidirectionalStream - Thread-ConsumeBidirectionalStream exiting

Error 2: Debug level exception in bidi.BackgroundConsumer on shutdown when using the BigQuery Storage Write API

  • See repro here: https://github.com/perpetua1/tmp-grpc-client-and-bq-storage-write-errors-repro/blob/master/bq_storage_write_repro.py
  • The same error from above is present in logs when using clients built on top of the BidiRpc/BackgroundConsumer like the
    BigQuery Storage Write API but it shows up differently.
  • Specifically, something related to how gRPC channels are created in official google clients means that the channel
    gets wrapped in a _StreamingResponseIterator which converts the _MultiThreadedRendezvous into a GoogleAPICallError
  • This means the level is reduced to DEBUG but a stack trace is still present in the logs.

Stack trace

...
14:41:08 - DEBUG - google.api_core.bidi - MainThread - Started helper thread Thread-ConsumeBidirectionalStream
14:41:13 - DEBUG - google.auth.transport.requests - Thread-3 - Making request: POST https://oauth2.googleapis.com/token
14:41:13 - DEBUG - urllib3.connectionpool - Thread-3 - Starting new HTTPS connection (1): oauth2.googleapis.com:443
14:41:13 - DEBUG - urllib3.connectionpool - Thread-3 - https://oauth2.googleapis.com:443 "POST /token HTTP/1.1" 200 None
14:41:14 - DEBUG - google.api_core.bidi - Thread-ConsumeBidirectionalStream - waiting for recv.
14:41:14 - DEBUG - google.api_core.bidi - Thread-ConsumeBidirectionalStream - recved response.
14:41:14 - DEBUG - google.api_core.bidi - Thread-ConsumeBidirectionalStream - waiting for recv.
14:41:14 - DEBUG - google.cloud.bigquery_storage_v1.writer - MainThread - Stopping consumer.
14:41:14 - DEBUG - google.api_core.bidi - Thread-2 - Cleanly exiting request generator.
14:41:14 - DEBUG - google.api_core.bidi - Thread-ConsumeBidirectionalStream - Thread-ConsumeBidirectionalStream caught error 499 Locally cancelled by application! and will exit. Generally this is due to the RPC itself being cancelled and the error will be surfaced to the calling code.
Traceback (most recent call last):
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers.py", line 116, in __next__
    return next(self._wrapped)
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 540, in __next__
    return self._next()
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 966, in _next
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.CANCELLED
	details = "Locally cancelled by application!"
	debug_error_string = "None"
>

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 663, in _thread_main
    response = self._bidi_rpc.recv()
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 346, in recv
    return next(self.call)
  File "/Users/dhendry/code/tmp-grpc-client-and-bq-storage-write-errors-repro/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers.py", line 119, in __next__
    raise exceptions.from_grpc_error(exc) from exc
google.api_core.exceptions.Cancelled: 499 Locally cancelled by application!
14:41:14 - INFO - google.cloud.bigquery_storage_v1.writer - Thread-1 - RPC termination has signaled streaming pull manager shutdown.
14:41:14 - INFO - google.api_core.bidi - Thread-ConsumeBidirectionalStream - Thread-ConsumeBidirectionalStream exiting
14:41:14 - DEBUG - google.cloud.bigquery_storage_v1.writer - MainThread - Finished stopping manager.
14:41:14 - DEBUG - root - MainThread - Stream closed, insertion success

Error 3: Uncaught exception in bidi.BackgroundConsumer log when using the BigQuery Storage Write API

  • This is the primary reason I am opening this bug
  • I have not been able to craft a reproducible example for this error, however the relevant stack trace is below and this
    occurs in our production environment (Google AppEngine flexible) 10-100 times per day.
  • It seems to be occurring along a very similar code path to the above example however the internal state of the grpc
    channel seems to be a bit different and the result is an uncaught StopIteration exception
  • It gets logged as an error.

Stack trace

Thread-ConsumeBidirectionalStream caught unexpected exception  and will exit.
Traceback (most recent call last):
  File "/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 663, in _thread_main
    response = self._bidi_rpc.recv()
  File "/.venv/lib/python3.9/site-packages/google/api_core/bidi.py", line 346, in recv
    return next(self.call)
  File "/.venv/lib/python3.9/site-packages/google/api_core/grpc_helpers.py", line 119, in __next__
    return next(self._wrapped)
  File "/.venv/lib/python3.9/site-packages/ddtrace/contrib/grpc/client_interceptor.py", line 162, in __next__
    return self._next()
  File "/.venv/lib/python3.9/site-packages/ddtrace/contrib/grpc/client_interceptor.py", line 144, in _next
    return next(self.__wrapped__)
  File "/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 540, in __next__
    return self._next()
  File "/.venv/lib/python3.9/site-packages/grpc/_channel.py", line 964, in _next
    raise StopIteration()
StopIteration

Proposed fix

I think just a bit of extra logic in the bidi.BackgroundConsumer class to catch and handle the StopIteration and
canceled exceptions would be sufficient to fix this issue in the second and third examples above. Specifically updating
the google.api_core.bidi.BackgroundConsumer._thread_main() implementation with the following block:

        except (exceptions.Cancelled, StopIteration):
            pass

Moreover catching the _MultiThreadedRendezvous could be done as well although this is admittedly very strange.

        except _MultiThreadedRendezvous as exc:
            if exc.code() != grpc.StatusCode.CANCELLED:
                _LOGGER.exception(
                    "%s caught unexpected exception %s and will exit.",
                    _BIDIRECTIONAL_CONSUMER_NAME,
                    exc,
                )

A better approach could be to update the grpc._channel._MultiThreadedRendezvous._next() implemenation to raise
StopIteration on StatusCode.CANCELLED in addition to OK.

So the full, the contents of bidi.BackgroundConsumer

    def _thread_main(self, ready):
        try:
            ready.set()
            self._bidi_rpc.add_done_callback(self._on_call_done)
            self._bidi_rpc.open()

            while self._bidi_rpc.is_active:
                # Do not allow the paused status to change at all during this
                # section. There is a condition where we could be resumed
                # between checking if we are paused and calling wake.wait(),
                # which means that we will miss the notification to wake up
                # (oops!) and wait for a notification that will never come.
                # Keeping the lock throughout avoids that.
                # In the future, we could use `Condition.wait_for` if we drop
                # Python 2.7.
                # See: https://github.com/googleapis/python-api-core/issues/211
                with self._wake:
                    while self._paused:
                        _LOGGER.debug("paused, waiting for waking.")
                        self._wake.wait()
                        _LOGGER.debug("woken.")

                _LOGGER.debug("waiting for recv.")
                response = self._bidi_rpc.recv()
                _LOGGER.debug("recved response.")
                self._on_response(response)
        ######################################## 
        # ADD THIS:
        except (exceptions.Cancelled, StopIteration):
            pass
        # AND MAYBE THIS, although its weird, perhaps there is a better way:
        except _MultiThreadedRendezvous as exc:
            if exc.code() != grpc.StatusCode.CANCELLED:
                _LOGGER.exception(
                    "%s caught unexpected exception %s and will exit.",
                    _BIDIRECTIONAL_CONSUMER_NAME,
                    exc,
                )
        ######################################## 
        except exceptions.GoogleAPICallError as exc:
            _LOGGER.debug(
                "%s caught error %s and will exit. Generally this is due to "
                "the RPC itself being cancelled and the error will be "
                "surfaced to the calling code.",
                _BIDIRECTIONAL_CONSUMER_NAME,
                exc,
                exc_info=True,
            )

        except Exception as exc:
            _LOGGER.exception(
                "%s caught unexpected exception %s and will exit.",
                _BIDIRECTIONAL_CONSUMER_NAME,
                exc,
            )

        _LOGGER.info("%s exiting", _BIDIRECTIONAL_CONSUMER_NAME)
@dhendry
Copy link
Author

dhendry commented Feb 21, 2024

I have created a PR to handle the most obvious and important case: #620

@vchudnov-g
Copy link
Contributor

Thanks for reporting these bugs and crafting the PR! We'll review these shortly.

One question: I can't access your link to simple_api_core_repro.py. Is there a typo or an access issue?

@vchudnov-g vchudnov-g added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. labels Feb 29, 2024
@dhendry
Copy link
Author

dhendry commented Mar 5, 2024

@vchudnov-g So sorry, I forgot to make the repository public. You should be able to access it now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
3 participants