Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gevent greenlet stuck when using stream recognize request in a separate thread. #12535

Open
dongzeli95 opened this issue Apr 1, 2024 · 3 comments
Assignees
Labels
external This issue is blocked on a bug with the actual product. needs more info This issue needs more information from the customer to proceed. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.

Comments

@dongzeli95
Copy link


import logging
import gevent
from google.cloud import speech_v1
import queue
import sentry_sdk

from gevent import spawn

# https://github.com/googleapis/google-cloud-python/blob/main/packages/google-cloud-speech/samples/generated_samples/speech_v1_generated_speech_streaming_recognize_async.py

logger = logging.getLogger('streaming_session')
class StreamingSession:
    END_OF_STREAM = object()
    def __init__(self, sample_rate=16000):
        # Initialize the Google Speech-to-Text client
        self.client = speech_v1.SpeechClient()

        # Configure the streaming request
        self.streaming_config = speech_v1.StreamingRecognitionConfig(
            config=speech_v1.RecognitionConfig(
                encoding=speech_v1.RecognitionConfig.AudioEncoding.LINEAR16,
                sample_rate_hertz=sample_rate,
                language_code="en-US",
                enable_automatic_punctuation=True,
                enable_spoken_punctuation=True,
            ),
            interim_results=False
        )

        # Initialize the request generator
        self.__start_session()

    def reset(self):
        """Reset the streaming session."""
        self.__start_session()

    def __start_session(self):
        """Resets the streaming session to start a new audio stream."""
        self.queue = queue.Queue()
        self.stream_recognize_task = spawn(self.stream_recognize)
        # gevent.sleep(0)
        self.transcription = ""
        self.closed = False

    def request_generator(self):
        """Generator function that yields streaming requests."""
        # Then, continuously send audio chunks as they arrive
        while not self.closed:
            try:
                # Use a blocking get() to wait for the first chunk
                chunk = self.queue.get()
                if chunk is self.END_OF_STREAM:
                    print("return here: END OF STREAM")
                    return
                data = [chunk]

                # Drain the queue of any additional data
                while True:
                    try:
                        chunk = self.queue.get(block=False)
                        if chunk is self.END_OF_STREAM:
                            self.closed = True
                            print("set closed to true")
                            break
                        data.append(chunk)
                    except queue.Empty:
                        break

                yield speech_v1.StreamingRecognizeRequest(audio_content=b"".join(data))
            except Exception as e:
                sentry_sdk.capture_exception(e)
                break

    def add_chunk(self, chunk):
        """Send an audio chunk to the streaming session in a non-blocking manner."""
        self.queue.put(chunk, block=False)

    def stream_recognize(self):
        return self.client.streaming_recognize(config=self.streaming_config,
                                                         requests=self.request_generator())

    def get_transcription(self):
        """Finalize the session and return the complete transcription."""
        self.queue.put(self.END_OF_STREAM)
        self.stream_recognize_task.join()
        responses = self.stream_recognize_task.get()

        # Process responses from the stream
        try:
            for response in responses:
                # Check for the presence of results in the response
                if not response.results:
                    continue

                # Iterate over the results in the response
                for result in response.results:
                    # Check if the result is a final result
                    if result.is_final:
                        # Get the best transcription from the final result
                        self.transcription += result.alternatives[0].transcript + ' '
                    else:
                        sentry_sdk.capture_message("GCS streaming result is not final")
        except Exception as e:
            self.client = speech_v1.SpeechClient()
            raise e

        # Capture sentry alert.
        if len(self.transcription) == 0:
            sentry_sdk.capture_message("GCS streaming transcription is empty, recreate speech client...")
            self.client = speech_v1.SpeechClient()

        return self.transcription

This is my code for running each recognize task in a separate gevent greenlet and this causes the greenlet to stuck:

+--- <Greenlet "Greenlet-2" at 0xffff6c59d860: spawn_greenlets>
 :          Parent: <Hub '' at 0xffff819ffd60 epoll default pending=0 ref=116 fileno=7 resolver=<gevent.resolver.thread.Resolver at 0xffff75a3ad90 pool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>>> threadpool=<ThreadPool at 0xffff819c86d0 tasks=11 size=10 maxsize=10 hub=<Hub at 0xffff819ffd60 thread_ident=0xffff83b5b020>> thread_ident=0xffff83b5b020>
 :          Running:
 :            File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 161, in apply
 :              return self.spawn(func, *args, **kwds).get()
 :          Spawned at:
 :            File "/app/app/audiohub/audiohub_client.py", line 48, in add_chunk
 :              self.__init_states(session_id, sample_rate=sample_rate)
 :            File "/app/app/audiohub/audiohub_client.py", line 190, in __init_states
 :              self.streaming_sessions[session_id] = StreamingSession(sample_rate=sample_rate)
 :            File "/app/app/audiohub/gcp/streaming_session.py", line 15, in __init__
 :              self.client = speech_v1.SpeechClient()
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/client.py", line 461, in __init__
 :              self._transport = Transport(
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 162, in __init__
 :              self._grpc_channel = type(self).create_channel(
 :            File "/usr/local/lib/python3.8/site-packages/google/cloud/speech_v1/services/speech/transports/grpc.py", line 217, in create_channel
 :              return grpc_helpers.create_channel(
 :            File "/usr/local/lib/python3.8/site-packages/google/api_core/grpc_helpers.py", line 386, in create_channel
 :              return grpc.secure_channel(
 :            File "/usr/local/lib/python3.8/site-packages/grpc/__init__.py", line 2119, in secure_channel
 :              return _channel.Channel(
 :            File "/usr/local/lib/python3.8/site-packages/grpc/_channel.py", line 2056, in __init__
 :              cygrpc.gevent_increment_channel_count()
 :            File "/usr/local/lib/python3.8/site-packages/gevent/pool.py", line 392, in spawn
 :              greenlet = self.greenlet_class(*args, **kwargs)

I've also included patch for my gevent application:

from gevent import monkey
monkey.patch_all()

import grpc.experimental.gevent as grpc_gevent
grpc_gevent.init_gevent()

So after the above greenlet stuck, my whole application will basically hang.

Note that if I do the stream recognition synchronously without using gevent greenlet, it works fine. However I still prefer to use it in separate thread to improve on latency. I wonder if this is also another grpc incompatibility issue with gevent.

@dongzeli95
Copy link
Author

So upon load testing, I found that if I have 30 concurrent streaming recognize sessions, it would create problem where all threads stuck. I have tried to use both gevent greenlet and native thread pool, nothing helps with this limit.

On the other hand, concurrency under 25 seems to work fine. I've also seen this post arguing about grpc max concurrency settings. I assume we are using grpc underneath this speech recognition package. Is there a way we can increase concurrency on speech client?

Could someone help take a look? Thank you so much!

@parthea
Copy link
Contributor

parthea commented Apr 7, 2024

Hi @dongzeli95 ,

Thanks for reporting this issue. This is potentially related to grpc/grpc#36265, googleapis/python-bigtable#949 and #12423. To confirm, can you try downgrading to grpcio==1.58.0?

@parthea parthea added type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns. priority: p2 Moderately-important priority. Fix may not be included in next release. needs more info This issue needs more information from the customer to proceed. external This issue is blocked on a bug with the actual product. labels Apr 7, 2024
@parthea parthea self-assigned this Apr 7, 2024
@dongzeli95
Copy link
Author

@parthea Just confirmed that changing dependency onto grpcio==1.58.0 didn't help with this issue. Hope that helps.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
external This issue is blocked on a bug with the actual product. needs more info This issue needs more information from the customer to proceed. priority: p2 Moderately-important priority. Fix may not be included in next release. type: bug Error or flaw in code with unintended results or allowing sub-optimal usage patterns.
Projects
None yet
Development

No branches or pull requests

2 participants