Skip to content
Ahaan Dabholkar edited this page Jul 12, 2022 · 6 revisions

pipelines -- build async pipelines quick!

Python 3.7

Setting Up a Basic Pipeline

As an example - albeit a bad one as it doesn't highlight the good side of asyncio, Lets set up a pipeline to simply reverse a string

Define the Coroutine

Lets define it in a class to group related coros together

from pipelines.processor import Processor
from pipelines.plumber import Plumber
import asyncio

class StringFuncs:
    @classmethod
    async def reverse(cls, self:Processor=None, q_elt:tuple=None):
        return q_elt[0][::-1]

# This coro shall be called as StringFuncs.reverse     

The arguments self and q_elt have to be there in the coroutine signature as the first two arguments. They contain the references to the Processor instance the coro is associated with and the tuple containing the input to the coro respectively.

Adding the InputProcessor Node

We need a way to provide input to the pipeline. This is achieved by creating a node of type InputProcessor. However, the developer does not need to worry about this. The only change that we need to keep in mind is the coroutine signature.

Let's define an input coroutine that generates random hex strings and add it to the StringFuncs class.

class StringFuncs:
    ...
    
    @classmethod
    async def input_coro(cls, self:Processor=None, output_q:asyncio.Queue=None):
        # This coroutine generates 20 random input strings and populates the
        # output_q of whatever node it runs on.
        import uuid
        acc = [ str(uuid.uuid4()) for _ in range(20) ]
        for i in acc:
            await output_q.put(i)
            # unnecessary but async
            await asyncio.sleep(0)

Add a node for Output ... cuz why not

class StringFuncs:
    ...
    
    @classmethod
    async def output_coro(cls, self, q_elt):
        print('output~> ', q_elt)

Make the Graph

The pipeline is represented by a graph like so -

input_d = {
    'nodes': {
        'inp': { 'coro': StringFuncs.input_coro },
        'rev': { 'coro': StringFuncs.reverse },
        'out': { 'coro': StringFuncs.output_coro },
    },
    'graph': {
        'inp': ('rev', 'out'),  # output of node 'inp' ~> 'rev' and 'out'
        'rev': ('out', ),       # and so on...
        'out': None,
    },
}

Building the Pipeline using Plumber

Now that the graph defining the pipeline is built, we need to instantiate it using the Plumber. The Plumber takes two arguments - the graph dict and a coro_map which is basically a function that maps the coro value in the nodes dict to the appropriate function object i.e. it maps

input_d['nodes']['inp']['coro'] ~> StringFuncs.input_coro 

In our toy application, it can be trivially defined as -

coro_map = lambda x: x

And so we can build and run the pipeline as follows -

_t = Plumber(input_d, coro_map=lambda x: x)
_t.create_pipeline()

What it should look like...

output~>  ('afcaae36-213f-46ff-bdb0-ab417fef65c9', '9c56fef714ba-0bdb-ff64-f312-63eaacfa')
output~>  ('81456b84-efb1-4791-baa9-c9555a70bfbd', 'dbfb07a5559c-9aab-1974-1bfe-48b65418')
output~>  ('8a480d0f-6f3c-4733-92f9-ae5cfa1748d9', '9d8471afc5ea-9f29-3374-c3f6-f0d084a8')
...

The example code can be found in demos/readme_demo.py