Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Possible race condition between _on_response and close #997

Open
hson2 opened this issue Sep 21, 2023 · 3 comments
Open

Possible race condition between _on_response and close #997

hson2 opened this issue Sep 21, 2023 · 3 comments
Assignees
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.

Comments

@hson2
Copy link

hson2 commented Sep 21, 2023

Environment details

  • OS type and version: Ubuntu 22.04
  • Python version: 3.11
  • pip version: 23.0.1
  • google-cloud-pubsub version: 2.18.4

Steps to reproduce

  1. Run code sample indefinitely
  2. Sometime it shows Assertion error in _on_response function.

Code example

def on_subscribe(subscription, until=None):
    """Decorator factory that provides subscribed messages to function.
    It handle decorated function as callback. So message should be acked/nacked
    inside decorated function.

    Args:
        subscription (str): Subscription ID. 
            Should be `projects/{PROJECT_ID}/subscriptions/{SUBSCRIPTION_ID}`
        until (datetime.datetime): This function will subscribe messages
            published until this timestamp.
    """

    def _callback_factory(func, finished, subscribe_until, **kwargs):

        def _callback(message):
            """Callback function.
                
            It sends signal if subscribed all messages.
            """
            publish_time = datetime.fromtimestamp(
                message.publish_time.timestamp())
            if subscribe_until and publish_time <= subscribe_until:
                return func(message, **kwargs)
            if subscribe_until and not finished.is_set():
                logging.info('Subscribed all messages published until %s',
                             subscribe_until)
                finished.set()
                message.nack()

        return _callback

    def _wrapper(func):

        @functools.wraps(func)
        def _inner_wrapper(**kwargs):
            # Event variable that is triggered when all messages are subscribed
            all_subscribed = Event()

            callback = _callback_factory(func=func,
                                         finished=all_subscribed,
                                         subscribe_until=subscribe_until,
                                         **kwargs)

            # Ensure closing subscriber for memory leak prevention.
            with pubsub_v1.SubscriberClient() as subscriber:
                future = subscriber.subscribe(
                    subscription=subscription,
                    callback=callback,
                    await_callbacks_on_shutdown=True,
                    flow_control=pubsub_v1.types.FlowControl(max_messages=5000),
                )

                all_subscribed.wait(timeout=60)

                # Wait until future is finished when it's cancelled.
                # If it cancelled by timeout or keyboard interrupt, ignore it.
                try:
                    future.cancel()
                    future.result(timeout=60)
                except (KeyboardInterrupt, TimeoutError):
                    pass
                except Exception as e:
                    logging.error("Error occurs during subscription to %s: %s",
                                  subscription, str(e))

        return _inner_wrapper

    return _wrapper

@on_subscribe(subscription="SUBSCRIPTION")
def callback(message):
    # Do something with message

Stack trace

Traceback (most recent call last):
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/api_core/bidi.py", line 657, in _thread_main
    self._on_response(response)
  File "/layers/google.python.pip/pip/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 1107, in _on_response
    assert self._scheduler is not None

Explanation

It's because future.cancel() executes manager.close() which makes _scheduler as None and it makes _on_response raise AssertionError.

Maybe it has to be protected by threading lock somehow.

@product-auto-label product-auto-label bot added the api: pubsub Issues related to the googleapis/python-pubsub API. label Sep 21, 2023
@liuyunnnn
Copy link
Contributor

Thank you for the detailed report. I'll try to reproduce this and fix accordingly.
In the meanwhile, I'm wondering why you choose to cancel first and then wait for result here

future.cancel()
future.result(timeout=60)

instead of a common use case as the sample shows. This could be a mitigation for the issue you are experiencing.

@liuyunnnn liuyunnnn self-assigned this Sep 21, 2023
@hson2
Copy link
Author

hson2 commented Sep 28, 2023

Thank you for the detailed report. I'll try to reproduce this and fix accordingly.

In the meanwhile, I'm wondering why you choose to cancel first and then wait for result here


future.cancel()

future.result(timeout=60)

instead of a common use case as the sample shows. This could be a mitigation for the issue you are experiencing.

Because I wanted to subscribe messages until certain timestamp (which turns out wrong idea 😅) and make callback to signal event if it subscribed all messages by using threading.Event object.

If I call result first without timeout then it will wait indefinitely and if I set timeout then it will raise TimeoutError at second call of result. I wanted to just cancel it first to make it avoid raising error.

@liuyunnnn liuyunnnn assigned pradn and unassigned pradn and liuyunnnn Oct 24, 2023
@chase-peach
Copy link

We are also seeing this issue.

Python version: 3.11
pip version: 23.3.1
google-cloud-pubsub version: 2.14.1

We are running in Google Kubernetes Engine (GKE) and listening to the following two signals in our worker for scaling:

import signal
signal.signal(signal.SIGINT, self.exit_gracefully)
signal.signal(signal.SIGTERM, self.exit_gracefully)

With a exit_gracefully implemented as follows:

def exit_gracefully(self) -> None:
    """Stop the async worker."""
    self.future.cancel()  # Request shutdown
    try:
        self.future.result(timeout=60 * 5)  # Block until the shutdown is complete (or up to 5min).
    except TimeoutError:
        self.logger.warning("Stop timeout reached, Pod will die.")
    except Exception as e:
        self.logger.error(f"Error while shutting down worker: {e}")

with the following setup:

self.subscriber = pubsub_v1.SubscriberClient()

flow_control = pubsub_v1.types.FlowControl(max_messages=self.pub_sub_flow_control_max_messages)
executor = futures.ThreadPoolExecutor(
    max_workers=self.pub_sub_thread_pool_executor_max_workers, thread_name_prefix="TPE-PeachWorker"
)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)

self.future = self.subscriber.subscribe(
    self.subscription_path,
    callback=self.pubsub_callback,
    flow_control=flow_control,
    scheduler=scheduler,
    await_callbacks_on_shutdown=True,
)

We do our main work in a separate thread from the main thread to allow python to receive signals:

# Execute the wait on the future for streaming pull in a different thread
# because Python's main thread is the only one that will receive signals such as
# SIGTERM for when a pod is requested to die so it can be replaced with a new one.
thread = Thread(target=self.process, args=(), daemon=True)
thread.start()

With the process method implemented as follows:

def process(self):
    with self.subscriber:
        try:
            self.future.result()
        except Cancelled:
            self.logger.info("Cancel request received.")

We are seeing the same scheduler assertion error that @hson2 saw:

AssertionError: null
  File "threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 921, in _shutdown
    assert self._scheduler is not None

However, we are also seeing another error immediately before:

AttributeError: 'NoneType' object has no attribute 'nack'
  File "threading.py", line 982, in run
    self._target(*self._args, **self._kwargs)
  File "/usr/local/lib/python3.11/site-packages/google/cloud/pubsub_v1/subscriber/_protocol/streaming_pull_manager.py", line 948, in _shutdown
    msg.nack()

@liuyunnnn Could you expand on the preference to not call cancel first in a scenario like this?

Thank you for your help :)!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: pubsub Issues related to the googleapis/python-pubsub API.
Projects
None yet
Development

No branches or pull requests

5 participants