Skip to content

samikama/yampl

 
 

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

YAMPL

YAMPL (Yet Another Message Passing Library) provides a simple abstraction of inter-process (local or distributed) & inter-thread communication channels.

A channel allows to send and receive data over it. Each end of a channel is attached to a socket:

  • ClientSocket: a ClientSocket can be connected to at most a single ServerSocket through a channel;
  • ServerSocket: a ServerSocket can be connected to zero ore more ClientSockets through a channel;

YAMPL allows to send and receive:

  • objects of trivially copiable types
  • arrays of trivially copiable types

A trivially copiable type

  • has no non-trivial copy constructors (no virtual functions or virtual bases)
  • has no non-trivial copy assignment constructors
  • has a trivial destructor

In other words, a trivially copyable type is any type for which the underlying bytes can be copied to an array of char or unsigned char and into a new object of the same type, and the resulting object would have the same value as the original.

Different communication strategies are offered to provide the best performances for the given problem:

  • Inter-thread: Lock Free Queues
  • Inter-process (local):
    • "small" messages: Lock Free Queues over POSIX Shared Memory, optimized for latency
    • "big" messages: UNIX Pipes (vmsplice), optimized for bandwidth
  • Inter-process (distributed): POSIX Sockets

Build, Test & Install

git clone https://github.com/vitillo/yampl
cd yampl
./configure --prefix=INSTALL_PATH
make
make check
make install

To compile and link against the YAMPL library:

g++ `pkg-config yampl --libs --cflags` foo.cpp

##Examples The examples subdirectory provides binaries that demonstrate some use-cases.

###Local Client The clients ping a server process and receive a reply from it.

#include <unistd.h>
#include <iostream>

#include "yampl.h"

using namespace std;
using namespace yampl;

int main(int argc, char *argv[]){
  char pong[100];
  const string ping = "ping from " + to_string(getpid());
  
  Channel channel("service", LOCAL_SHM);
  ISocketFactory *factory = new SocketFactory();
  ISocket *socket = factory->createClientSocket(channel);

  while(true){
    socket->send(ping);
    socket->recv(pong);
    cout << pong << endl;
  }
}

###Local Server The server process replies to the pings of the client.

#include <unistd.h>
#include <iostream>

#include "yampl.h"

using namespace std;
using namespace yampl;

int main(int argc, char *argv[]){
  char ping[100];

  Channel channel("service", LOCAL_SHM);
  ISocketFactory *factory = new SocketFactory();
  ISocket *socket = factory->createServerSocket(channel);

  while(true){
    socket->recv(ping);
    socket->send("pong");
    cout << ping << endl;
    sleep(1);
  }
}

###Distributed Producer

#include <unistd.h>
#include <iostream>

#include "yampl.h"

using namespace yampl;
using namespace std;

void deallocator(void *, void*){}

int main(int argc, char *argv[]){
  string message = "Hello from " +  to_string(getpid());
  
  Channel channel("127.0.0.1:3333", DISTRIBUTED);
  ISocketFactory *factory = new SocketFactory();
  ISocket *socket = factory->createClientSocket(channel, MOVE_DATA, deallocator);

  while(true){
    socket->send(message);
    cout << "Message sent" <<  endl;
    sleep(1);
  }
}

###Distributed Consumer

#include <iostream>

#include "yampl.h"

using namespace std;
using namespace yampl;

int main(int argc, char *argv[]){
  char *message = 0;

  Channel channel("127.0.0.1:3333", DISTRIBUTED);
  ISocketFactory *factory = new SocketFactory();
  ISocket *socket = factory->createServerSocket(channel, MOVE_DATA);

  while(true){
    socket->recv(message);
    cout << message << endl;
  }
}

###Multithreaded Client-Server The following is a similar example to the above one but this time in a multithreaded environment (C++11).

#include <unistd.h>
#include <iostream>
#include <thread>

#include "yampl.h"

inline void deallocator(void *, void*){}

using namespace std;
using namespace yampl;

int main(int argc, char *argv[]){
  const int nThreads = 10;
  ISocketFactory *factory = new SocketFactory();

  thread server([factory] {
    Channel channel("service", THREAD);
    ISocket *socket = factory->createServerSocket(channel, MOVE_DATA, deallocator);

    while(true){
      cout << "Ping from client " << socket->recv<int>() << endl;
      socket->send(0);
      sleep(1);
    }
  });
  
  for(int i = 0; i < nThreads; i++){
    thread t([factory, i] {
      Channel channel("service", THREAD);
      ISocket *socket = factory->createClientSocket(channel, MOVE_DATA, deallocator);

      while(true){
        socket->send(i);
        socket->recv<int>();
      }
    });

    t.detach();
  }

  server.join();
}

About

Yet Another Message Passing Library

Topics

Resources

Stars

Watchers

Forks

Packages

No packages published

Languages

  • HTML 39.0%
  • C++ 35.7%
  • Shell 14.6%
  • Roff 10.0%
  • Other 0.7%