Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AioPipe and AioQueue performance #27

Open
kaotika opened this issue Jan 28, 2019 · 4 comments
Open

AioPipe and AioQueue performance #27

kaotika opened this issue Jan 28, 2019 · 4 comments

Comments

@kaotika
Copy link

kaotika commented Jan 28, 2019

Hi,

I wanted to measure the time it takes to send/receive some basic values (float) from a process to another. One test with a pipe and another with a queue.

The code in short:

  • main starts a manager task
  • manager starts as much worker processes as defined
  • each worker starts a listener task for the quere and the pipe
  • the manager sends/puts the timestamp time.time() each second to the queue/pipe
  • the listeners will respond to the send/put, calculates the difference between send/receive and prints it

On my dev machine (i7-3520M, 3.6GHz, 16 GB Ram) I got around 0.5-1.9ms for pipes and 0.9-1.1 ms for queues. I expected pipes and queues to be faster than ~1ms. Are my expectations or my testcode wrong?

#!/usr/bin/env python3
# -*- coding=utf-8 -*-

import aioprocessing
import asyncio
from collections import namedtuple
import os
import sys
import time


async def sec_to_ms(timediff):
    return timediff * 1000


async def start_pipe_listener(exit_event, pipe):
    pid = os.getpid()
    while not exit_event.is_set():
        send_time = await pipe.coro_recv()
        recv_time = time.time()
        timediff = await sec_to_ms(recv_time - send_time)
        print(f"Received from manager pipe, took {timediff} ms on pid {pid}")


async def start_queue_listener(exit_event, queue, pid=os.getpid()):
    pid = os.getpid()
    while not exit_event.is_set():
        send_time = await queue.coro_get()
        recv_time = time.time()
        timediff = await sec_to_ms(recv_time - send_time)
        print(f"Received from manager queue, took {timediff} ms on pid {pid}")


def worker(exit_event, pipe, queue):
    pid = os.getpid()

    # define new loop inside this worker
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    print(f"Worker started as {pid} with loop {hex(id(loop))}")

    pipe_task = asyncio.ensure_future(start_pipe_listener(exit_event, pipe))
    queue_task = asyncio.ensure_future(start_queue_listener(exit_event, queue))

    try:
        loop.run_until_complete(asyncio.wait([pipe_task, queue_task]))
    except KeyboardInterrupt:
        print(f"\nGracefully shutting down worker {pid}")
    finally:
        loop.close()


async def _pipe_sender(pipe):
    pipe.manager_pipe.coro_send(time.time())


async def _queue_sender(queue):
    queue.put(time.time())


async def process_manager(exit_event, worker_count=1):
    pid = os.getpid()
    print(f"Manager started as {pid}")

    DuplexPipe = namedtuple('DuplexPipe', ['manager_pipe', 'worker_pipe'])

    pipelines = [
        DuplexPipe(*aioprocessing.AioPipe())
        for w in range(worker_count)
    ]
    queues = [
        aioprocessing.AioQueue()
        for w in range(worker_count)
    ]
    processes = [
        aioprocessing.AioProcess(
            target=worker, args=(exit_event, pipelines[i].worker_pipe, queues[i]))
        for i in range(worker_count)
    ]
    [p.start() for p in processes]

    while not exit_event.is_set():
        print(f"Manager Ticker {pid}")
        asyncio.gather(*[_pipe_sender(pipe) for pipe in pipelines])
        asyncio.gather(*[_queue_sender(queue) for queue in queues])
        await asyncio.sleep(1)

    # print("shutting down processes and manager")
    await asyncio.wait([p.coro_join() for p in processes])


def main():
    pid = os.getpid()
    loop = asyncio.get_event_loop()

    print(f"Main started as {pid} with loop {hex(id(loop))}")

    worker_count = 1
    exit_event = aioprocessing.AioEvent()
    manager_task = asyncio.ensure_future(
        process_manager(exit_event, worker_count))

    try:
        done, pending = loop.run_until_complete(asyncio.wait([manager_task]))
    except KeyboardInterrupt:
        print("\nCtrl-C received")
        exit_event.set()
        manager_task.cancel()
        done, pending = loop.run_until_complete(asyncio.wait([manager_task]))
    finally:
        loop.close()


if __name__ == "__main__":
    main()
@dmgolembiowski
Copy link

I noticed the line recv_time = time.time() near the top and I'm wondering if that might be causing your problem (but I'm not 100%). I thought Raymond Hettinger said (at the 2017 San Francisco Bay python event), at one point, that most of the standard library didn't support asyncio because it's written entirely in blocking calls. Perhaps you can maneuver this by using a coroutine like: recv_time = asyncio.subprocess.subprocess.time.time() and maybe wrap in a async with aioprocessing.AioLock()

@dmgolembiowski
Copy link

I noticed the line recv_time = time.time() near the top and I'm wondering if that might be causing your problem (but I'm not 100%). I thought Raymond Hettinger said (at the 2017 San Francisco Bay python event), at one point, that most of the standard library didn't support asyncio because it's written entirely in blocking calls. Perhaps you can maneuver this by using a coroutine like: recv_time = asyncio.subprocess.subprocess.time.time() and maybe wrap in a async with aioprocessing.AioLock()

Alternatively, an (unlikely) source could be caused by the collections.namedtuple https://lwn.net/Articles/731423/

@Joshuaalbert
Copy link

@kaotika Any update on this? I'd like to understand the latency with aioprocessing before making the decision to use it for a project. Cheers

@mahinoresearch
Copy link

mahinoresearch commented Dec 20, 2021

@kaotika, @Joshuaalbert To identify the performance of the queue mechanism, you may find it helpful to minimise the impact of your test process. Rather than doing some processing on each data item sent via the queues, try sending 1,000,000 items from a generator and time how long that takes overall. Compare your result to the time taken by the generator alone, sending the same data to your timing system without using queues. The difference will be the queue overhead. On a round trip from main() to an echo worker, via one queue in each direction, I have no problem getting 25k round trip messages sent. The rate limiting factor in my case is still the infrastructure and not the queues. The latency you obtain in practice will depend on how long the queue is (i.e. how many items in it). You can further accelerate things if you short-cut sending and receiving by presuming the queues are neither empty nor full. For example, you might insert items into the queue like this:

try:
    queue.put_nowait(item)
except QueueFull:
    await queue.coro_put(item)

In general, for a processing chain using queues, the framework of your application will have more impact on the throughput and latency than the performance of the queues themselves. Options for improving performance are fairly well covered in the literature - ensure that producer and consumer tasks use "pull" rather than "push" techniques, wait somewhere sensible, and handoff the asyncio loop cooperatively, even using await asyncio.sleep(0) if required, etc. If the queues really do become the rate limiting factor in your system, consider batching items rather than sending one by one, or consider using multiple queues, so the the data rate gets up to the pace you seek.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants