Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Add workaround for https://github.com/grpc/grpc/issues/25364 #213

Merged
merged 1 commit into from Aug 16, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 56 additions & 4 deletions google/cloud/pubsublite/cloudpubsub/internal/managed_event_loop.py
Expand Up @@ -14,24 +14,76 @@

from asyncio import AbstractEventLoop, new_event_loop, run_coroutine_threadsafe
from concurrent.futures import Future
from threading import Thread
from typing import ContextManager
from threading import Thread, Lock
from typing import ContextManager, Generic, TypeVar, Optional, Callable

_T = TypeVar("_T")

class ManagedEventLoop(ContextManager):

class _Lazy(Generic[_T]):
_Factory = Callable[[], _T]

_lock: Lock
_factory: _Factory
_impl: Optional[_T]

def __init__(self, factory: _Factory):
self._lock = Lock()
self._factory = factory
self._impl = None

def get(self) -> _T:
with self._lock:
if self._impl is None:
self._impl = self._factory()
return self._impl


class _ManagedEventLoopImpl(ContextManager):
_loop: AbstractEventLoop
_thread: Thread

def __init__(self, name=None):
self._loop = new_event_loop()
self._thread = Thread(target=lambda: self._loop.run_forever(), name=name)
self._thread = Thread(
target=lambda: self._loop.run_forever(), name=name, daemon=True
)

def __enter__(self):
self._thread.start()
return self

def __exit__(self, exc_type, exc_value, traceback):
self._loop.call_soon_threadsafe(self._loop.stop)
self._thread.join()

def submit(self, coro) -> Future:
return run_coroutine_threadsafe(coro, self._loop)


# TODO(dpcollins): Remove when underlying issue is fixed.
# This is a workaround for https://github.com/grpc/grpc/issues/25364, a grpc
# issue which prevents grpc-asyncio working with multiple event loops in the
# same process. This workaround enables multiple topic publishing as well as
# publish/subscribe from the same process, but does not enable use with other
# grpc-asyncio clients. Once this issue is fixed, roll back the PR which
# introduced this to return to a single event loop per client for isolation.
_global_event_loop: _Lazy[_ManagedEventLoopImpl] = _Lazy(
lambda: _ManagedEventLoopImpl(name="PubSubLiteEventLoopThread").__enter__()
)


class ManagedEventLoop(ContextManager):
_loop: _ManagedEventLoopImpl

def __init__(self, name=None):
self._loop = _global_event_loop.get()

def __enter__(self):
return self

def __exit__(self, exc_type, exc_value, traceback):
pass

def submit(self, coro) -> Future:
return self._loop.submit(coro)