Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using the async iterator is significantly slower compared to using callbacks in a subscription #541

Open
DeltaEpsilon7787 opened this issue Feb 27, 2024 · 0 comments
Labels
defect Suspected defect such as a bug or regression

Comments

@DeltaEpsilon7787
Copy link

DeltaEpsilon7787 commented Feb 27, 2024

Observed behavior

The way async iterators are implemented, they have an unacceptable drop in throughput compared to callback-based subscription. Checking the code of how they're implemented, they appear to be doing a lot of unnecessary actions for each response in __anext__.

Using code in Steps to Reproduce, on my PC I get 1.29 seconds for callbacks and 4.6 seconds for async iter, which translates to 310k msg/sec for callbacks and 87k msg/sec for built-in async its. This is a 3.5x drop in throughput.

Expected behavior

This should either be remedied or explicitly mentioned somewhere as this is relatively unexpected behavior. One would expect the async iterator to, well, be a thin wrapper around the pending queue.

In the code provided in Steps to Reproduce, I implement a very basic custom async iterator that uses callbacks to fill in an internal queue that the async it then gets results from. Given that callback impl and async it impl are mutually exclusive in current API, this does not break any API preconditions to my knowledge.
This implementation only takes 1.4 seconds which is only ~5% slower than just callbacks, although it is missing various boilerplate for stopping the iteration.

Server and client version

nats-server: v2.10.5
nats-py: v2.7.0

Host environment

No response

Steps to reproduce

from time import perf_counter

import asyncio as aio
import multiprocessing as mp

import nats

TEST_AMOUNT = 400000


class CbMessageConsumer:
    def __init__(self, target):
        self.counter = 0

        self._target = target
        self._completed = aio.Event()

    async def cb(self, msg):
        self.counter += 1
        if self.counter >= self._target:
            self._completed.set()

    async def completion_observer(self):
        await self._completed.wait()


class IterMessageConsumer:
    def __init__(self, target):
        self.counter = 0

        self._target = target
        self._completed = aio.Event()

    async def do_work(self, source):
        async for msg in source:
            self.counter += 1
            if self.counter >= self._target:
                self._completed.set()
                break

    async def completion_observer(self):
        await self._completed.wait()


# Custom `async for` using callbacks
class FasterAsyncIt:
    def __init__(self):
        self._queue = aio.Queue()

    async def _queue_cb(self, msg):
        self._queue.put_nowait(msg)

    def __aiter__(self):
        return self

    async def __anext__(self):
        # Ommitted checking for closed channel and such
        return await self._queue.get()


def cb_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        cb_message_consumer = CbMessageConsumer(TEST_AMOUNT)
        cb_observer_task = aio.create_task(cb_message_consumer.completion_observer())
        cb_observer_task.add_done_callback(
            lambda _: print(f"CB: {perf_counter() - start_time}")
        )

        start_time = perf_counter()
        await nc.subscribe("test.cb", cb=cb_message_consumer.cb)

        await cb_observer_task
        await nc.close()

    aio.run(_())


def iter_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        it_message_consumer = IterMessageConsumer(TEST_AMOUNT)
        it_observer_task = aio.create_task(it_message_consumer.completion_observer())
        it_observer_task.add_done_callback(
            lambda _: print(f"IT: {perf_counter() - start_time}")
        )

        it_based_sub = await nc.subscribe("test.iter")
        start_time = perf_counter()
        aio.create_task(it_message_consumer.do_work(it_based_sub.messages))

        await it_observer_task
        await nc.close()

    aio.run(_())

def custom_iter_receiver():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        custom_it = FasterAsyncIt()

        custom_it_message_consumer = IterMessageConsumer(TEST_AMOUNT)
        custom_it_observer_task = aio.create_task(
            custom_it_message_consumer.completion_observer()
        )
        custom_it_observer_task.add_done_callback(
            lambda _: print(f"CUSTOM IT: {perf_counter() - start_time}")
        )

        await nc.subscribe("test.custom_iter", cb=custom_it._queue_cb)
        start_time = perf_counter()
        aio.create_task(custom_it_message_consumer.do_work(custom_it))
        await custom_it_observer_task
        await nc.close()

    aio.run(_())


def sender_side():
    async def _():
        nc = await nats.connect("127.0.0.1:4222")

        source = iter(range(TEST_AMOUNT))

        try:
            while True:
                for _ in range(30000):
                    next(source)
                    await nc.publish("test.cb", b"")
                    await nc.publish("test.iter", b"")
                    await nc.publish("test.custom_iter", b"")
                # Throttle to avoid slow consumers
                await aio.sleep(0.01)

        except StopIteration:
            pass

        await nc.close()

    aio.run(_())


if __name__ == "__main__":
    sender = mp.Process(target=sender_side)
    receiver_1 = mp.Process(target=cb_receiver)
    receiver_2 = mp.Process(target=iter_receiver)
    receiver_3 = mp.Process(target=custom_iter_receiver)

    receiver_1.start()
    receiver_2.start()
    receiver_3.start()
    sender.start()

    sender.join()
    receiver_1.join()
    receiver_2.join()
    receiver_3.join()
@DeltaEpsilon7787 DeltaEpsilon7787 added the defect Suspected defect such as a bug or regression label Feb 27, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
defect Suspected defect such as a bug or regression
Projects
None yet
Development

No branches or pull requests

1 participant