Skip to content

imranariffin/aiotaskq

Repository files navigation

aiotaskq

codecov build pylint

A simple asynchronous task queue

Motivation

Popular asynchronous worker library like Celery doesn't support asyncio and is hard to use for advanced usage. aiotaskq aims to help users compose tasks in a very native async-await manner.

Plus, it is also fully-typed for better productivity and correctness.

Give it a try and let us know if you like it. For questions or feedback feel to file issues on this repository.

Example Usage

Install aiotaskq

python -m pip install --upgrade pip
pip install aiotaskq

Define a simple app like the following:

tree .
.
└── app
    └── app.py

Where app.py contains the following:

import asyncio

import aiotaskq


@aiotaskq.task
def some_task(b: int) -> int:
    # Some task with high cpu usage
    def _naive_fib(n: int) -> int:
        if n <= 2:
            return 1
        return _naive_fib(n - 1) + _naive_fib(n - 2)
    return _naive_fib(b)


async def main():
    async_result = await some_task.apply_async(42)
    sync_result = some_task(42)
    assert async_result == sync_result
    print(f"sync_result == async_result == 165580141. Awesome!")


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Start redis

docker run --publish 127.0.0.1:6379:6379 redis

In a different terminal, start the aiotaskq worker

python -m aiotaskq worker app.app

Then in another different terminal, run your app

python ./app.py
# Output: sync_result == async_result == 165580141. Awesome!

Advanced usage example

Let's say we want to compose a workflow where we want to break up some of the tasks and run them in parallel:

                    |-- task_2 --> |
                    |-- task_2 --> |     | task_3 --> |
START -> task_1 --> |-- task_2 --> | --> | task_3 --> | --> task_4 --> FINISH
                    |-- task_2 --> |     | task_3 --> |
                    |-- task_2 --> |

Using celery we might end up with this

from celery import Celery

app = Celery()


@app.task
def task_1(*args, **kwargs):
        pass


@app.task
def task_2(*args, **kwargs):
        pass


@app.task
def task_3(*args, **kwargs):
        pass


@app.task
def task_4(*args, **kwargs):
        pass


if __name__ == "__main__":
    step_1 = task_1.si(some_arg="a")
    step_2 = [task_2.si(some_arg=f"{i}") for i in range(5)]
    step_3 = [task_3.si(some_arg=f"{i}") for i in range(3)]
    step_4 = task_4.si(some_arg="b")
    workflow = chord(
        header=step_1,
        body=chord(
            header=step_2,
            body=chord(
                header=step_3,
                body=step_4,
            ),
        ),
    )
    output = workflow.apply_async().get()
    print(output)

Using aiotaskq we may end up with the following:

import asyncio

from aiotaskq import task


@task
def task_1(*args, **kwargs):
        pass


@task
def task_2(*args, **kwargs):
        pass


@task
def task_3(*args, **kwargs):
        pass


@task
def task_4(*args, **kwargs):
        pass


# So far the same as celery

# And now the workflow is just native python, and you're free
# to use any `asyncio` library of your choice to help with composing
# your workflow e.g. `trio` to handle more advanced scenarios like
# error propagation, task cancellation etc.
if __name__ == "__main__":
    step_1 = task_1.apply_async()
    step_2 = asyncio.gather(task_2.apply_async(arg=f"{i}" for i in range(5)))
    step_3 = asyncio.gather(task_3.apply_async(arg=f"{i}" for i in range(3)))
    step_4 = task_4.apply_async()
    workflow = [step_1, step_2, step_3, step_4]
    output = await asyncio.gather(workflow)
    print(output)

Install

pip install aiotaskq

Development

source ./activate.sh

Tests

In another terminal

./docker.sh

In the main terminal

source ./activate.sh
./test.sh

Links