New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Numerous small performance and correctness issues #211
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -16,40 +16,33 @@ | |
import threading | ||
from typing import Generic, TypeVar, Callable, Dict, Awaitable | ||
|
||
from google.api_core.exceptions import FailedPrecondition | ||
|
||
_Key = TypeVar("_Key") | ||
_Client = TypeVar("_Client") | ||
|
||
|
||
class ClientMultiplexer(Generic[_Key, _Client]): | ||
_OpenedClientFactory = Callable[[], _Client] | ||
_OpenedClientFactory = Callable[[_Key], _Client] | ||
_ClientCloser = Callable[[_Client], None] | ||
|
||
_factory: _OpenedClientFactory | ||
_closer: _ClientCloser | ||
_lock: threading.Lock | ||
_live_clients: Dict[_Key, _Client] | ||
|
||
def __init__( | ||
self, closer: _ClientCloser = lambda client: client.__exit__(None, None, None) | ||
self, | ||
factory: _OpenedClientFactory, | ||
closer: _ClientCloser = lambda client: client.__exit__(None, None, None), | ||
): | ||
self._factory = factory | ||
self._closer = closer | ||
self._lock = threading.Lock() | ||
self._live_clients = {} | ||
|
||
def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: | ||
def get_or_create(self, key: _Key) -> _Client: | ||
with self._lock: | ||
if key not in self._live_clients: | ||
self._live_clients[key] = factory() | ||
return self._live_clients[key] | ||
|
||
def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: | ||
with self._lock: | ||
if key in self._live_clients: | ||
raise FailedPrecondition( | ||
f"Cannot create two clients with the same key. {_Key}" | ||
) | ||
self._live_clients[key] = factory() | ||
self._live_clients[key] = self._factory(key) | ||
return self._live_clients[key] | ||
|
||
def try_erase(self, key: _Key, client: _Client): | ||
|
@@ -75,52 +68,49 @@ def __exit__(self, exc_type, exc_val, exc_tb): | |
|
||
|
||
class AsyncClientMultiplexer(Generic[_Key, _Client]): | ||
_OpenedClientFactory = Callable[[], Awaitable[_Client]] | ||
_OpenedClientFactory = Callable[[_Key], Awaitable[_Client]] | ||
_ClientCloser = Callable[[_Client], Awaitable[None]] | ||
|
||
_factory: _OpenedClientFactory | ||
_closer: _ClientCloser | ||
_lock: asyncio.Lock | ||
_live_clients: Dict[_Key, _Client] | ||
_live_clients: Dict[_Key, Awaitable[_Client]] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a lot of time in acquiring the lock in the publish hotpath- this change makes the lock not necessary |
||
|
||
def __init__( | ||
self, closer: _ClientCloser = lambda client: client.__aexit__(None, None, None) | ||
self, | ||
factory: _OpenedClientFactory, | ||
closer: _ClientCloser = lambda client: client.__aexit__(None, None, None), | ||
): | ||
self._factory = factory | ||
self._closer = closer | ||
self._live_clients = {} | ||
|
||
async def get_or_create(self, key: _Key, factory: _OpenedClientFactory) -> _Client: | ||
async with self._lock: | ||
if key not in self._live_clients: | ||
self._live_clients[key] = await factory() | ||
return self._live_clients[key] | ||
|
||
async def create_or_fail(self, key: _Key, factory: _OpenedClientFactory) -> _Client: | ||
async with self._lock: | ||
if key in self._live_clients: | ||
raise FailedPrecondition( | ||
f"Cannot create two clients with the same key. {_Key}" | ||
) | ||
self._live_clients[key] = await factory() | ||
return self._live_clients[key] | ||
async def get_or_create(self, key: _Key) -> _Client: | ||
if key not in self._live_clients: | ||
self._live_clients[key] = asyncio.ensure_future(self._factory(key)) | ||
return await self._live_clients[key] | ||
|
||
async def try_erase(self, key: _Key, client: _Client): | ||
async with self._lock: | ||
if key not in self._live_clients: | ||
return | ||
current_client = self._live_clients[key] | ||
if current_client is not client: | ||
return | ||
del self._live_clients[key] | ||
if key not in self._live_clients: | ||
return | ||
client_future = self._live_clients[key] | ||
current_client = await client_future | ||
if current_client is not client: | ||
return | ||
# duplicate check after await that no one raced with us | ||
if ( | ||
key not in self._live_clients | ||
or self._live_clients[key] is not client_future | ||
): | ||
return | ||
del self._live_clients[key] | ||
await self._closer(client) | ||
|
||
async def __aenter__(self): | ||
self._lock = asyncio.Lock() | ||
return self | ||
|
||
async def __aexit__(self, exc_type, exc_val, exc_tb): | ||
live_clients: Dict[_Key, _Client] | ||
async with self._lock: | ||
live_clients = self._live_clients | ||
self._live_clients = {} | ||
live_clients: Dict[_Key, Awaitable[_Client]] | ||
live_clients = self._live_clients | ||
self._live_clients = {} | ||
for topic, client in live_clients.items(): | ||
await self._closer(client) | ||
await self._closer(await client) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -14,12 +14,10 @@ | |
|
||
from concurrent.futures.thread import ThreadPoolExecutor | ||
from typing import Union, Optional, Set | ||
from threading import Lock | ||
|
||
from google.cloud.pubsub_v1.subscriber.futures import StreamingPullFuture | ||
|
||
from google.cloud.pubsublite.cloudpubsub.internal.client_multiplexer import ( | ||
ClientMultiplexer, | ||
) | ||
from google.cloud.pubsublite.cloudpubsub.internal.single_subscriber import ( | ||
AsyncSubscriberFactory, | ||
) | ||
|
@@ -40,22 +38,16 @@ class MultiplexedSubscriberClient(SubscriberClientInterface): | |
_executor: ThreadPoolExecutor | ||
_underlying_factory: AsyncSubscriberFactory | ||
|
||
_multiplexer: ClientMultiplexer[SubscriptionPath, StreamingPullFuture] | ||
_lock: Lock | ||
_live_clients: Set[StreamingPullFuture] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, the subscriber enforced that there was only one open subscription stream per-subscription per-client, but there's actually no need for this. |
||
|
||
def __init__( | ||
self, executor: ThreadPoolExecutor, underlying_factory: AsyncSubscriberFactory | ||
): | ||
self._executor = executor | ||
self._underlying_factory = underlying_factory | ||
|
||
def cancel_streaming_pull_future(fut: StreamingPullFuture): | ||
try: | ||
fut.cancel() | ||
fut.result() | ||
except: # noqa: E722 | ||
pass | ||
|
||
self._multiplexer = ClientMultiplexer(cancel_streaming_pull_future) | ||
self._lock = Lock() | ||
self._live_clients = set() | ||
|
||
@overrides | ||
def subscribe( | ||
|
@@ -68,28 +60,40 @@ def subscribe( | |
if isinstance(subscription, str): | ||
subscription = SubscriptionPath.parse(subscription) | ||
|
||
def create_and_open(): | ||
underlying = self._underlying_factory( | ||
subscription, fixed_partitions, per_partition_flow_control_settings | ||
) | ||
subscriber = SubscriberImpl(underlying, callback, self._executor) | ||
future = StreamingPullFuture(subscriber) | ||
subscriber.__enter__() | ||
return future | ||
|
||
future = self._multiplexer.create_or_fail(subscription, create_and_open) | ||
future.add_done_callback( | ||
lambda fut: self._multiplexer.try_erase(subscription, future) | ||
underlying = self._underlying_factory( | ||
subscription, fixed_partitions, per_partition_flow_control_settings | ||
) | ||
subscriber = SubscriberImpl(underlying, callback, self._executor) | ||
future = StreamingPullFuture(subscriber) | ||
subscriber.__enter__() | ||
future.add_done_callback(lambda fut: self._try_remove_client(future)) | ||
return future | ||
|
||
@staticmethod | ||
def _cancel_streaming_pull_future(fut: StreamingPullFuture): | ||
try: | ||
fut.cancel() | ||
fut.result() | ||
except: # noqa: E722 | ||
pass | ||
|
||
def _try_remove_client(self, future: StreamingPullFuture): | ||
with self._lock: | ||
if future not in self._live_clients: | ||
return | ||
self._live_clients.remove(future) | ||
self._cancel_streaming_pull_future(future) | ||
|
||
@overrides | ||
def __enter__(self): | ||
self._executor.__enter__() | ||
self._multiplexer.__enter__() | ||
return self | ||
|
||
@overrides | ||
def __exit__(self, exc_type, exc_value, traceback): | ||
self._multiplexer.__exit__(exc_type, exc_value, traceback) | ||
with self._lock: | ||
live_clients = self._live_clients | ||
self._live_clients = set() | ||
for client in live_clients: | ||
self._cancel_streaming_pull_future(client) | ||
self._executor.__exit__(exc_type, exc_value, traceback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a lot of "self" time in get_or_create callers from constructing the factory to pass in. This is in the publish hot path, hence this change