Skip to content

Simple distributed data manipulation and processing routines in Julia

License

Notifications You must be signed in to change notification settings

LCSB-BioCore/DistributedData.jl

Repository files navigation

DistributedData.jl logo DistributedData.jl

Build status Documentation
CI doc doc

Simple distributed data manipulation and processing routines for Julia.

Acknowledgements

This was originally developed for GigaSOM.jl; DistributedData.jl package contains the separated-out lightweight distributed-processing framework that was used in GigaSOM.jl.

DistributedData.jl was developed at the Luxembourg Centre for Systems Biomedicine of the University of Luxembourg (uni.lu/lcsb). The development was supported by European Union ELIXIR Staff Exchange programme 2020 (elixir-europe.org), and European Union's Horizon 2020 Programme under PerMedCoE project (permedcoe.eu) agreement no. 951773.

Uni.lu logo   LCSB logo   ELIXIR logo   PerMedCoE logo

Why?

DistributedData.jl provides a very simple, imperative and straightforward way to move your data around a cluster of Julia processes created by the Distributed package, and run computation on the distributed data pieces. The main aim of the package is to avoid anything complicated-- the first version used in GigaSOM had just under 500 lines of relatively straightforward code (including the doc-comments).

Compared to plain Distributed API, you get more straightforward data manipulation primitives, some extra control over the precise place where code is executed, and a few high-level functions. These include a distributed version of mapreduce, simpler work-alike of the DistributedArrays functionality, and easy-to-use distributed dataset saving and loading.

Most importantly, the main motivation behind the package is that the distributed processing should be simple and accessible.

Brief how-to

The package provides a few very basic primitives that lightly wrap the Distributed package functions remotecall and fetch. The most basic one is save_at, which takes a worker ID, variable name and variable content, and saves the content to the variable on the selected worker. get_from works the same way, but takes the data back from the worker.

You can thus send some random array to a few distributed workers:

julia> using Distributed, DistributedData

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

julia> @everywhere using DistributedData

julia> save_at(2, :x, randn(10,10))
Future(2, 1, 4, nothing)

The Future returned from save_at is the normal Julia future from Distributed, you can even fetch it to wait until the operation is really done on the other side. Fetching the data is done the same way:

julia> get_from(2,:x)
Future(2, 1, 15, nothing)

julia> get_val_from(2,:x) # auto-fetch()ing variant
10×10 Array{Float64,2}:
 -0.850788    0.946637     1.78006    
 -0.49596     0.497829    -2.03013
   

All commands support full quoting, which allows you to easily distinguish between code parts that are executed locally and remotely:

julia> save_at(3, :x, randn(1000,1000))     # generates a matrix locally and sends it to the remote worker

julia> save_at(3, :x, :(randn(1000,1000)))  # generates a matrix right on the remote worker and saves it there

julia> get_val_from(3, :x)                  # retrieves the generated matrix and fetches it


julia> get_val_from(3, :(randn(1000,1000))) # generates the matrix on the worker and fetches the data

Notably, this is different from the approach taken by DistributedArrays and similar packages -- all data manipulation is explicit, and any data type is supported as long as it can be moved among workers by the Distributed package. This helps with various highly non-array-ish data, such as large text corpora and graphs.

There are various goodies for easy work with matrix-style data, namely scattering, gathering and running distributed algorithms:

julia> x = randn(1000,3)
1000×3 Array{Float64,2}:
 -0.992481   0.551064     1.67424
 -0.751304  -0.845055     0.105311
 -0.712687   0.165619    -0.469055
  

julia> dataset = scatter_array(:myDataset, x, workers())  # sends slices of the array to workers
Dinfo(:myDataset, [2, 3])   # a helper for holding the variable name and the used workers together

julia> get_val_from(3, :(size(myDataset)))
(500, 3)    # there's really only half of the data

julia> dmapreduce(dataset, sum, +) # MapReduce-style sum of all data
-51.64369103751014

julia> dstat(dataset, [1,2,3]) # get means and sdevs in individual columns
([-0.030724038974465212, 0.007300925745200863, -0.028220577808245786],
 [0.9917470012495775, 0.9975120525455358, 1.000243845434252])

julia> dmedian(dataset, [1,2,3]) # distributed iterative median in columns
3-element Array{Float64,1}:
  0.004742259615849834
  0.039043266340824986
 -0.05367799062404967

julia> dtransform(dataset, x -> 2 .^ x) # exponentiate all data (medians should now be around 1)
Dinfo(:myDataset, [2, 3])

julia> gather_array(dataset) # download the data from workers to a sing
1000×3 Array{Float64,2}:
 0.502613  1.46517   3.1915
 0.594066  0.55669   1.07573
 0.610183  1.12165   0.722438
  

Using DistributedData.jl in HPC environments

You can use ClusterManagers package to add distributed workers from many different workload managers and task scheduling environments, such as Slurm, PBS, LSF, and others.

See the documentation for an example of using Slurm to run DistributedData.