Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel Execution #1103

Open
dmadisetti opened this issue Apr 10, 2024 · 5 comments
Open

Parallel Execution #1103

dmadisetti opened this issue Apr 10, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@dmadisetti
Copy link
Contributor

Description

I started doing more intense compute work in Marimo, and noticed the kernel is locked when executing a cell.

Suggested solution

Marimo could work on the execution graph in parallel since the dependencies are resolved
I'm sure this has already been considered, but I couldn't find an issue for it

I do think the dependency sidebar could be a bit more useful even without parallel execution (color which cells are queued to run, which cells have run, and which cell is (cells are) running)

Alternative

the functional style required for reactivity means that in theory, you could maybe extend to do data streaming/ map-reduce/ distributed computation with little effort on the user's part

but maybe this is a cloud or enterprise solution :)

Additional context

No response

@akshayka
Copy link
Contributor

Marimo could work on the execution graph in parallel since the dependencies are resolved
I'm sure this has already been considered, but I couldn't find an issue for it

Yes, thanks for opening an issue. This is something I'd like to experiment with in the coming weeks; it's just too cool to not try it ...!

@akshayka akshayka added the enhancement New feature or request label Apr 11, 2024
@dmadisetti
Copy link
Contributor Author

I came across an async batching pattern today and thought of this discussion:
https://stackoverflow.com/questions/68137200/python-async-coroutines-in-batches

The referenced library is very small, and probably just worth copying: https://marimo.app/l/cmn6g0

I think my understanding of the runtime is still a little naive, but would something like this work?

dataflow

def topological_sort_by_batch(                                                                                                                                                                                                   
    graph: DirectedGraph, cell_ids: Collection[CellId_t]                                                                                                                                                                         
) -> list[list[CellId_t]]:                                                                                                                                                                                                       
    """Sort `cell_ids` in topological layers for async execution."""                                                                                                                                                             
    parents, children = induced_subgraph(graph, cell_ids)                                                                                                                                                                        
    roots = [cid for cid in cell_ids if not parents[cid]]                                                                                                                                                                        
    sorted_cell_batch = [copy(roots)]                                                                                                                                                                                            
                                                                                                                                                                                                                                 
    # BFS with explicit layers                                                                                                                                                                                                   
    while roots:                                                                                                                                                                                                                 
        sorted_cell_batch.append([])                                                                                                                                                                                             
        roots, layer = [], roots                                                                                                                                                                                                 
        for pid in layer:                                                                                                                                                                                                        
            for child in children[pid]:                                                                                                                                                                                          
                sorted_cell_batch[-1].append(child)                                                                                                                                                                              
                parents[child].remove(pid)                                                                                                                                                                                       
                if not parents[child]:                                                                                                                                                                                           
                    roots.append(child)                                                                                                                                                                                          
    # TODO make sure parents for each id is empty, otherwise cycle                                                                                                                                                               
    return sorted_cell_batch

...

    async def run_cell_async(                                                                                                                                                                                                    
        self, cell_id: CellId_t, kwargs: dict[str, Any]                                                                                                                                                                          
    ) -> tuple[Any, dict[str, Any]]:
        # ...
        execute_batched_async = semaphore_wrap(execute_cell_async)                                                                                                                                                               
        for cids in topological_sort_by_batch(graph, ancestor_ids):                                                                                                                                                              
            # TODO: Clone globals or explicitly shared refs? See #1142 
            await asyncio.gather(*[                                                                                                                                                                                              
              execute_batched_async(graph.cells[cid], glbls)                                                                                                                                                                     
              for cid in cids])  

@akshayka
Copy link
Contributor

Very cool. Thanks for sharing!

Because the event loop is single threaded, the example code would provide concurrency (if multiple cells used top-level await, but I'm not sure that's a common pattern) but not parallelism.

What we really want is parallelism. To get parallelism we'd need to use threads (in which we'd get parallelism if cells dropped the GIL, e.g. when dropping into NumPy or PyTorch, which both release the GIL ASAP) or processes (no GIL, but globals would need to be cloned).

Still, the general idea would be the same: sort into layers and execute.

@dmadisetti
Copy link
Contributor Author

Ridiculous, ipyparallel recommends building and managing your own DAG: https://ipyparallel.readthedocs.io/en/latest/reference/dag_dependencies.html

I think marimo could just build upon this opposed to trying to manage communication itself.

"marimo cloud" option could be to deploy to GCE or something with a click of a button. Just easy distributed compute would put it ahead of other competition.

@akshayka
Copy link
Contributor

akshayka commented May 9, 2024

I think marimo could just build upon this opposed to trying to manage communication itself.

I'll take a look, thanks for the pointer.

"marimo cloud" option could be to deploy to GCE or something with a click of a button. Just easy distributed compute would put it ahead of other competition.

This would be pretty amazing. Been noodling on ideas like this for a bit. We should jam over another video call soon ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants