Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove redundant operations in BaseClient #132

Closed
garvenlee opened this issue Sep 23, 2023 · 1 comment
Closed

Remove redundant operations in BaseClient #132

garvenlee opened this issue Sep 23, 2023 · 1 comment
Labels
low-priority low priority tasks refactoring refactoring of code

Comments

@garvenlee
Copy link

garvenlee commented Sep 23, 2023

  1. Make BaseClient._handlers a plain dict
  2. Simplify read operations in the underlaying connection

About point 1:
Note that each client corresponds to a connection, but there may be multiple publishers or subscribers sharing this client with multiple PublishConfirm or Deliver callbacks registered into. When scheduling frame callbacks in _listener task, rstream needs a for loop, but only one callback actually process this frame, so the for loop is unnecessary. To solve this, Client needs to maintain extra info about stream topo.

# BaseClient
async def _listener(self):
    …
    handler = self._handlers.get(frame.__class__)
    if handler is not None:
        await handler(frame, self)  # add new arg `Client`

# Producer
async def _get_or_create_publisher(self, stream, publisher_name):
    …
    client = await self._get_or_create_client(stream)
    publisher_id = client.next_publisher_id
    client.publishers_info[publisher_id] = stream

async def _on_publish_confirm(self, frame, client):
    stream = client.publishers_info[frame.publisher_id]
    publisher = self._publishers[stream]
    …

# Consumer
async def _create_subscriber(self, stream):
    …
    client = await self._get_or_create_client(stream)
    subscription_id = client.next_subscription_id
    client.subscriptions_info[subscription_id] = reference

async def _on_deliver(self, frame, client):
    reference = client.subscriptions_info[frame.subscription_id]
    subscriber = self._subscribers[reference]
    …

About point 2:
Connection is based on Asyncio Stream API. Currently, Connection holds a buffer, and StreamReader also holds a buffer, in this way, data is copied twice in Connection._read. Maybe rewrite like this:

async def _read(self, n: int):
    try:
        return await self._reader.readexactly(n)
    except IncompleteReadError:
        raise ConnectionClosed
@garvenlee garvenlee changed the title Maybe redesign BaseClient Remove redundant operations in BaseClient Sep 23, 2023
@DanielePalaia DanielePalaia added refactoring refactoring of code low-priority low priority tasks labels Nov 3, 2023
@DanielePalaia
Copy link
Collaborator

I'll close this issue as at the moment we are not planning this implementation. Anyway any contribution or PR submission is welcomed!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
low-priority low priority tasks refactoring refactoring of code
Projects
None yet
Development

No branches or pull requests

2 participants