Skip to content
ct-clmsn edited this page May 17, 2021 · 14 revisions

Welcome to the zmq-collectives-rs wiki!

An Oversimplified Description of PRAM and SPMD Models

The most straight forward mechanism for parallelizing an application - without multithreading - is to execute multiple instances of the program on a machine or set of machines. Each instance of the program is assigned to a portion of the total amount of data to be processed. Program instances are assigned a logical identifier, a rank, to determine which instance of the program is working on specific subset of the data.

As an example, for 10 data elements and 5 instances - or ranks - of the program, each rank would consume 2 data elements. Rank 0 consumes data elements 0 and 1, Rank 1 consumes data elements 2 and 3, etc. Adding multithreading and accelerators to this paradigm, one can see the simplicity of the model and a natural hierarchy of parallelism associated with the model. At the same time, it should be equally clear why programming in this model is challenging. SPMD mirrors several of the existing challenges of programming with threads and extends those issues over a distributed network of machines. In the threaded model, users define or select a function to be executed by several hardware or software threads. In SPMD programming, users define a program that is executed multiple times as separate processes concurrently (simultaneously).

At some point, a user will want instances of the program to share state and communicate with each other. In threaded application development, this could be performed using the thread's rank and the indices it is responsible for processing (shared memory). In SPMD applications, this typically involves marshalling data and sending it to a program instance, by rank, across a network. There are a couple mechanisms for sharing state available, in 0MQ the options are: shared memory (inproc), pipes (ipc), and network sockets (tcp). This Rust library/crate implements a distributed memory, message passing, implementation of the PRAM and SPMD model.

Library/Crate Interface

  • Params - structure that stores environment variable information used to initialize the implementation.
  • Backend - trait representing the basic number of operations necessary to perform the collective communication algorithms.
  • Collectives - trait defining the collective communication algorithms.
  • TcpBackend - structure implementing a Backend and Collectives trait.

Collectives Interface

Below is the Collectives trait definition. Comments are provided for each method defined in the trait. Additional documentation is available to further explain each method. The documentation will include use cases, limitations, etc.

pub trait Collectives {

        // Data type (scalar, vector, etc) at Rank 0 is sent to all ranks
        //
        fn broadcast<Data>(&self, data : &mut Data)
           where Data : serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Copy;

        // Each machine has a partition of data, this method performs a local
        // reduction, or fold, and sends the result to a rank for further
        // reduction.
        //
        // init : DataItem - initial value to 'fold' into the reduction
        // f : F - reduction function
        // d : Data - data type consisting of DataItems to perform the reduction over
        //
        // Returns a std::Result<DataItem, std::io::Error>, Rank 0 gets the final result
        //
        fn reduce<DataItem, Data, F >(&self, init : DataItem, f : F, data : Data ) -> Result<DataItem, std::io::Error>
            where DataItem : serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Copy,
                  Data : std::iter::IntoIterator<Item = DataItem> + serde::ser::Serialize + serde::de::DeserializeOwned + Clone,
                  F : Fn(DataItem, Data::Item) -> DataItem + Clone;

        // Each rank will block at this point in the program
        //
        fn barrier(&self);

        // An iterator of std::slice::Iter<DataItem> named 'in_beg' with a length/size 'in_size',
        // is segmented into even blocks that are then sent to other ranks. The data is stored
        // into out : std::slice::IterMut<DataItem> on each rank. Note, users are required to
        // allocate space in the container type associated with 'out' prior to calling scatter.
        // The size of the output container type should be straight forward to compute.
        //
        // Example: A program is running on 2 Ranks. Rank 0 has 4 elements and performs a scatter.
        // When the scatter completes, Rank 0 will have 2 DataItems stored in 'out' and Rank 1 will
        // have 2 DataItems stored in it's copy of 'out'.
        //
        fn scatter< 'a, DataItem >(&self, in_beg : std::slice::Iter<'a, DataItem>, in_size : usize, out : &mut std::slice::IterMut<DataItem> )
            where DataItem : serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Copy;

        // An iterator of std::slice::Iter<DataItem> named in_beg with a length/size in_size,
        // stored on each Rank is consolidated into the in_beg found on Rank 0. Note, users
        // are required to allocate space in the container type associated with 'out' prior
        // to calling gather.
        //
        // Example: A program is running on 2 Ranks. Rank 0 and Rank 1 has 2 elements and performs
        // a gather. When the gather completes, Rank 0 will have 4 DataItems stored in 'out'.
        //
        fn gather< 'a, DataItem >(&self, in_beg : std::slice::Iter<'a, DataItem>, in_size : usize, out : &mut std::slice::IterMut<DataItem> )
            where DataItem : serde::ser::Serialize + serde::de::DeserializeOwned + Clone + Copy;
    }