Skip to content

omkarprabhu-98/go-ipfs-mapreduce

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

10 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Go IPFS MapReduce

A simple POC Map Reduce Library for IPFS in Golang

Design

This acts as a standalone library. So only IPFS nodes using this library to register services will be able to communicate to each other to perform MapReduce.

In the future the plan it to have this part of the daemon so all IPFS nodes have this protocol and we can do p2p map reduce at a much larger scale. Because IPFS slits a file added to it into 256 KB blocks, we can independently process them.

Every Peer (a IPFS node) registers the map & reduce gorpc services using the library. This sets the required stream handlers for the libp2p protocol "/ipfs/mapreduce".

To run mapreduce, we use the library to get the master struct, registering the master service and initializing by passing in the required variables. Files and stored and fetched from IPFS via the Cid indentifier.

  • node: the ipfs node used to connect to peers, fetch files, etc.
  • mapFuncFilePath: map golang code built to a ".so" file in plugin mode
  • reduceFuncFilePath: reduce golang code built to a ".so" file in plugin mode
  • noOfReducers: no of reducers
  • dataFileCid: cid string for the data file to process using map reduce.
  • mrOutputFile: file name where final output should be stored

Calling a run method on the master starts the map reduce process.

Usage

All Peers

import (
    ...
    mapreduce "github.com/omkarprabhu-98/go-ipfs-mapreduce"
    ...
)

fmt.Println("Spawning ephemeral ipfs node")
node, err := spawnEphemeral(ctx)
if err != nil {
    panic(fmt.Errorf("failed to spawn ephemeral node: %s", err))
}
err = mapreduce.RegisterProtocol(node)
if err != nil {
    panic(fmt.Errorf("failed to register map reduce protocol: %s", err))
}

Run Map Reduce

master, err := mapreduce.InitMaster(node, mapFuncFilePath, reduceFuncFilePath, 
noOfReducers, dataFileCid);
if err != nil {
    panic(fmt.Errorf("failed to init master: %s", err))
}
master.RunMapReduce(ctx)

Observe status

ticker := time.NewTicker(5 * time.Second)
quit := make(chan struct{})
go func() {
    for {
        select {
            case <- ticker.C:
                fmt.Println("MapStatus:", master.GetMapStatus())
                redStatus := master.GetReduceStatus()
                fmt.Println("ReduceStatus:", redStatus)
                if redStatus.Complete == redStatus.Total {
                    quit <- struct{}{}
                }
            case <- quit:
                ticker.Stop()
                return
        }
    }
}()

Check examples directory for examples

Demo

Snippets of sample runs locally

  1. Small input file 1KB
trim2.mov
  1. Large file 581 KB
Screen.Recording.2021-08-13.at.1.26.35.PM.mov

References

Inspiration and code references are taken from MIT's 6.824 course

TODOs for the project

  • Handle errors and timeouts in the current code
  • Write TF-IDF mapreduce for Hadoop testing (may have to be in Java) and get stats
  • Write TF-IDF in Go for our use case
  • Script to spawn up the cluster and distribute the data in CloudLab nodes. Also test the TF-IDF for our framework.
  • TODO in master.go for choosing which peer to get the block from
  • TODO in master.go to avoid locking the whole computation
  • Better way to handle TODO in master.go line 94
  • (can avoid for now) Weird error in utils.go TODO, tmp fix in place, find a better fix.
  • testing
  • integrate into go-ifs

About

Simple POC Map Reduce Library for IPFS

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages