Skip to content
James Edmondson edited this page Sep 25, 2018 · 6 revisions

C++ Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads | Optimizations | KaRL | Encryption | Checkpointing | Knowledge Performance | Logging


MADARA Threads

MADARA supports fast, portable user-threads that interact with each other over data and control planes. With the addition of the Threader class and the extensible BaseThread, users can now make intuitive applications that not only perform distributed collaboration and communication but can also take advantage of multiple cores on a local node to really speed up computation and performance.


Table of Contents


1. A Hello World Threading Example

To use the Threader class, you need to do the following things:

  1. Extend a BaseThread and implement the run method
  • Create a Knowledge Base
  • Create a Threader
  • Run threads
  • Wait for threads to finish

Sounds simple enough, right? It really is. Let's make a simple Hello World application that launches a thread that prints "Hello World!".

Hello World Application

#include <iostream>
#include "madara/knowledge/KnowledgeBase.h"
#include "madara/threads/Threader.h"

// some shortcuts for common MADARA namespaces
namespace engine = madara::knowledge;
namespace threads = madara::threads;

// 1: Extend a Base_Thread
class HelloWorld : public threads::BaseThread
{
  public:
    void run (void)
    {
      std::cerr << "Hello World!\n";
    }
};

int main (int argc, char ** argv)
{
  // 2: Create a Knowledge Base
  engine::KnowledgeBase knowledge;

  // 3: Create a Threader 
  threads::Threader threader (knowledge);

  // 4: Run threads
  threader.run ("hello_world", new HelloWorld ());

  // 5: Wait for threads to finish
  threader.wait ();

  return 0;
}

Believe it or not, you have just created a multi-threaded application with MADARA!


2. Understanding BaseThread

madara::threads::BaseThread is a simple class with some really powerful features. We only showed part of what's possible with the Hello World example. Next, we'll show how you can make your threads work together to accomplish a simple task, but first, we need to go over the features of the BaseThread and exactly what you're inheriting.

BaseThread has three total virtual functions, but two of them have default implementations, so you don't have to implement them unless you want to. The run method is the only method that is required, and that's what we used in Hello World to print the message to the screen.

The other two methods are init and cleanup, and their signatures are outlined in the following code segment (taken from the Base_Thread header file):

/**
 * Initializes thread with MADARA context
 * @param   context   context for querying current program state
 **/
 virtual void init (madara::knowledge::KnowledgeBase & context)
 {
 }

/**
 * Cleans up any thread residue (usually instances created in init).
 * It's important to note that the only things that would need to
 * be cleaned up are generally ports opened, memory initialized with
 * new/malloc, etc. Anything MADARA related is generally cleaned up
 * for you.
 **/
 virtual void cleanup (void)
 {
 }

The method init method is called before run and allows you to save or query the context for information or create any kind of MADARA containers that you might be interested in for your run function.

The method cleanup is called after run and before the destruction of the thread's internals. If you do some kind of unmanaged heap-based memory allocation, then you may want to override this function. However, if all you're using are MADARA containers, you probably don't need to use this function at all.

In addition to the init, run, and cleanup methods are three variables that tell the user what the thread name is (i.e., the "name" variable) and allow direct messaging between the Threader class and the threads to let them know about user intentions. The paused and terminated variables are instances of Madara Integer Containers that are used by the Threader class to let each thread know when it is passing along a request to pause or terminate the thread.

To check on the status of these control plane flags between the Threader and your thread, simply check it as you would any other variable. Let's take a look at an example that features a nagging hello class that prints hello world until the user asks it to terminate.

Nagging Hello World Application

#include <iostream>
#include "madara/knowledge/KnowledgeBase.h"
#include "madara/threads/Threader.h"
#include "madara/utility/Utility.h"

// some shortcuts for common MADARA namespaces
namespace engine = madara::knowledge;
namespace threads = madara::threads;
namespace utility = madara::utility;

// A nagging thread that just keeps saying hello until terminated
class NaggingHelloWorld : public threads::BaseThread
{
  public:
    void run (void)
    {
      while (terminated == 0)
      {
        std::cerr << "Hello World from " << name << " thread!\n";
      }
    }
};

// the main function
int main (int argc, char ** argv)
{
  engine::KnowledgeBase knowledge;

  threads::Threader threader(knowledge);
  threader.run ("hello_world", new NaggingHelloWorld ());

  // sleep for 10 seconds so the thread can nag at us
  utility::sleep (10.0);

  // tell threads to terminate
  threader.terminate ();

  threader.wait ();

  return 0;
}

3. Understanding Threader

So, from the previous examples, you've seen that we can terminate all threads and wait for them to finish. But what if we only wanted to terminate one thread out of many? In the Nagging Hello World application, what if we had three threads, two of which were doing work we wanted done and only one of them that was actually nagging us? Let's see what it looks like to tell only the annoying one to finish.

Nagging Hello World Application

#include <sstream>
#include "madara/knowledge/KnowledgeBase.h"
#include "madara/threads/Threader.h"
#include "madara/utility/Utility.h"

// some shortcuts for common MADARA namespaces
namespace engine = madara::knowledge;
namespace threads = madara::threads;
namespace utility = madara::utility;
namespace containers = engine::containers;

// A nagging thread that says hello until terminated
class NaggingHelloWorld : public threads::BaseThread
{
  public:
    void init (engine::KnowledgeBase & data)
    {
      knowledge = data;
    }

    void run (void)
    {
      // print until the user asks us to terminate
      while (terminated == 0)
      {
        // create a message to print
        std::stringstream buffer;
        buffer << "Hello World from " << name << " thread!\n";
        
        // use knowledge base's print to avoid corrupting std::cerr
        knowledge.print (buffer.str ());
      }
    }

    void cleanup (void)
    {
      knowledge.print ("Nagging thread is shutting down.\n");
    }

    engine::KnowledgeBase knowledge;
};

// a counter thread that works with other counters
class CounterThread : public threads::BaseThread
{
public:

  void init (engine::KnowledgeBase & context)
  {
    // reference a counter variable in the context
    counter.set_name ("counter", context);

    // save the context
    data context;
  }

  void run (void)
  {
    // count until we reach 100K or until the user terminates us
    while (*counter < 100000 && terminated == 0)
    {
      ++counter;
      
      // create a message to print
      std::stringstream buffer;
      buffer << "Thread " << name << " updated counter to " << *counter << "\n";
        
      // use knowledge base's print to avoid corrupting std::cerr
      data.print (buffer.str ());
    }
  }

  void cleanup (void)
  {
    data.print ("Counter thread is shutting down.\n");
  }

private:
  containers::Integer counter;
  engine::KnowledgeBase data;
};

// the main function
int main (int argc, char ** argv)
{
  engine::KnowledgeBase knowledge;

  threads::Threader threader (knowledge);

  // run a nagger thread
  threader.run ("nagger", new NaggingHelloWorld ());

  // run the two counter threads
  threader.run ("counter1", new CounterThread ());
  threader.run ("counter2", new CounterThread ());

  // sleep for 3 seconds so the thread can nag us for a bit
  utility::sleep (3.0);

  // tell threads to terminate
  threader.terminate ("nagger");

  // the other two threads should continue until they reach count
  threader.wait ();

  return 0;
}

To compile any of these examples in g++, you'll need to specify the following options (where "my_program" will be changed to whatever your application file name is).

g++ my_program.cpp -I$ACE_ROOT -I$MADARA_ROOT/include -L$MADARA_ROOT/lib -lMADARA -lACE -o my_program

4. Controlling Threader executions

The examples up to this point have focused on unregulated threads doing whatever executions they can. In threading terms, we call this kind of activity bursting. The thread will devour whatever CPU time it can until the thread meets its conditions for terminating.

But that's not the only mode of threading available in MADARA.

An alternative method for controlling threads is to allow periodic execution of each thread. We cover periodic thread execution in the following example.

Instead of having two uncontrolled counters, we will instead create two controlled counters operating at 20hz (20 executions per second) and 40hz each. Creating extensions of the BaseThread for this type of controlled threading paradigm usually simplifies threading significantly, as the Threader class and associated worker threads will check the paused and terminated flags for you, in between executions. However, as you will see in the example, you can set the terminated flags yourself if it's appropriate.

There are two main development changes required for using periodic threads rather than bursty threads. First, we need to implement a run method that does not continuously loop, but instead only does some useful bit of work one time. Second, we need to call the Threader::run method that takes in a hertz parameter for controlling the thread execution.

Here's how to accomplish this:

Periodic Counter Thread application

#include <sstream>
#include "madara/knowledge/KnowledgeBase.h"
#include "madara/threads/Threader.h"
#include "madara/utility/Utility.h"

// some shortcuts for common MADARA namespaces
namespace engine = madara::knowledge;
namespace threads = madara::threads;
namespace utility = madara::utility;
namespace containers = engine::containers;

// a counter thread that works with other counters
class CounterThread : public threads::BaseThread
{
public:

  void init (engine::KnowledgeBase & context)
  {
    // reference a counter variable in the context
    counter.set_name ("counter", context);

    // save the context
    data context;

    // construct the message buffer for printing the thread name and counter info
    std::stringstream buffer;
    buffer << "Thread " << name << " updated counter to {counter}\n";
    message buffer.str ();
  }

  void run (void)
  {
    if (counter > 1000)
    {
      terminated = 1;
    }
    else
    {
      ++counter;
      
      // use knowledge base's print to avoid interleaving in std::cerr
      data.print (message);
    }
  }

  void cleanup (void)
  {
    data.print ("Counter thread is shutting down.\n");
  }

private:
  /// a reference to the counter inside of the knowledge base
  containers::Integer counter;

  /// the knowledge base that contains the counter and allows printing
  engine::KnowledgeBase data;

  /// the message we want to print
  std::string message;
};

// the main function
int main (int, char **)
{
  engine::KnowledgeBase knowledge;

  threads::Threader threader (knowledge);

  // run the two counter threads, one at 20hz and one at 40hz
  threader.run (20.0, "counter1", new CounterThread ());
  threader.run (40.0, "counter2", new CounterThread ());

  // the other threads self terminate when they reach 1000
  threader.wait ();

  return 0;
}

5. Understanding the Threader wait function

Threader::wait is a useful function that can be used to wait for the termination of one thread or all threads. However, it can also be used as a multifunctional sleep statement that either waits for thread termination or a time has passed, whichever happens first.

There are two versions of Threader::wait. The first version waits for all threads to complete or the timeout indicated in WaitSettings has occurred. The second version is a targeted wait statement that waits for a specific thread to terminate or a timeout has occurred. The return value for these functions will indicate whether or not the particular threads being waited on have terminated. If false is returned, then the timeout occurred.


6. Using Thread-Safe Queues

A common paradigm to use for parallel programming is to create a queue between threads, the most frequent of which is generally a solution to the producer/consumer problem. The central idea to the producer/consumer concept is that a producer creates information for the consumers to consume and utilize. A queue encapsulates this interaction intuitively, and in this section, we'll cover how to setup your own producer/consumer application quickly and easily with MADARA.

We'll continue to build on the knowledge you've gained over the past few examples. In the following producer/consumer example, a producer thread creates new actions for consumers to perform based on a random number generator included in the Madara::Utility package.

Producer/Consumer Application Example

#include <iostream>

#include "madara/knowledge/KnowledgeBase.h"
#include "madara/threads/Threader.h"
#include "madara/utility/Utility.h"
#include "madara/knowledge/containers/Queue.h"

// some shortcuts for common MADARA namespaces
namespace engine = madara::knowledge;
namespace threads = madara::threads;
namespace utility = madara::utility;
namespace containers = engine::containers;

/**
 * A consumer of action types
 **/
class Consumer: public threads::BaseThread
{
public:
  /**
    * Initializes thread with MADARA context
    * @param   context   context for querying current program state
    **/
  virtual void init (engine::KnowledgeBase & context)
  {
    // initialize references to variables in the knowledge base
    jobs.set_name (".jobs", context);
    jobs_completed.set_name (".jobs_completed", context);

    data context;
  }

  /**
    * Checks the job queue until terminated for new tasks to perform
    **/
  virtual void run (void)
  {
    // dequeue until terminated
    engine::KnowledgeRecord job jobs.dequeue (false);

    if (job.is_valid ())
    {
      /**
        * use std::cerr instead of data.print to minimize time spent in
        * critical sections
        **/
      if (job.to_integer () == 0)
      {
        std::cerr << "Checking News\n";
      }
      else if (job.to_integer () == 1)
      {
        std::cerr << "Checking Stocks\n";
      }
      else if (job.to_integer () == 2)
      {
        std::cerr << "Checking Email\n";
      }
      else if (job.to_integer () == 3)
      {
        std::cerr << "Checking Schedule\n";
      }
      else
      {
        std::cerr << "Request to terminate from main thread.\n";
      }
        
      // Update the global counter of jobs done.
      ++jobs_completed;
    }
  }

private:
  containers::Queue jobs;
  containers::Integer jobs_completed;
  engine::KnowledgeBase data;
};

/**
 * A producer of action types for consumers to perform
 **/
class Producer: public threads::BaseThread
{
public:
  /**
    * Initializes thread with MADARA context
    * @param   context   context for querying current program state
    **/
  virtual void init (engine::KnowledgeBase & context)
  {
    // create a reference to the jobs queue
    jobs.set_name (".jobs", context);

    data context;
  }

  /**
    * Generate a job. Jobs can be one of four types of events:
    * 0: Check News
    * 1: Check Stocks
    * 2: Check Email
    * 3: Check Schedule
    **/
  virtual void run (void)
  {
    /**
      * generate job consisting of 4 possible events: (int: 0-3)
      **/
    jobs.enqueue (madara::utility::rand_int (0, 3, false));
  }

private:
  containers::Queue jobs;
  engine::KnowledgeBase data;
};


// the main function
int main (int, char **)
{
  engine::KnowledgeBase knowledge;

  threads::Threader threader (knowledge);

  // create the initial jobs queue
  containers::Queue jobs (".jobs", knowledge);
  jobs.resize (100);

  /**
   * explicitly set random seed to right now for randomizer engine.
   * Subsequent calls to MADARA random funcs should use false for 3rd arg
   **/
  utility::rand_int (0, 1, true);

  // run the producer and consumer threads at 20hz
  threader.run (20.0, "producer", new Producer ());
  threader.run (20.0, "consumer", new Consumer ());

  // sleep for 10s to allow producer and consumer to perform work.
  utility::sleep (10.0);

  // request all threads to terminate
  threader.terminate ();

  // wait for the threads to cleanup
  threader.wait ();

  return 0;
}

You may have noticed that the jobs queue created in the above example uses a local variable prefix (".jobs") to keep the queue local. This is important and it's also a powerful concept to learn about the Queue container.

Why is this important? Queue is thread-safe but not necessarily process-safe. If you have multiple consumers/producers across nodes in your network, the timing of producing and consuming is likely to produce race conditions due to the way consistency is enforced within the MADARA engine.

For instance, node1 may have a producer thread and node2 may have a producer thread. If both of them enqueue a job at the same time, each will place a task on the queue simultaneously. Whoever writes last to this index in the queue will end up having its item in the queue, and due to network latency, it is very possible that a queue between nodes would have inconsistent production/consumption ordering. In many cases, this may be fine, but for most applications this is likely to be a debugging nightmare.

Each node can have multiple producers and consumers acting independently of other producers and consumers on other nodes because it is using a local job queue between threads on the node. This is the recommended way to use the thread-safe Queue container. The only recommended way to use a Queue container for distributed applications (i.e., not only multithreaded but multiprocessed) is to create a queue between each producer and consumer (a direct channel between them), so that each queue is representing a 1:1 relationship between producer and consumer. That should work fine.

In conclusion, a couple of points to keep in mind about the Queue container.

  • It is very useful for multithreaded applications
  • It is rarely useful for distributed applications. Be careful with how you use Queue in distributed applications.

More Information

If you are looking for code examples and guides, your best bet would be to start with the tutorials (located in the tutorials directory of the MADARA root directory--see the README.txt file for descriptions). After that, there are many dozens of tests that showcase and profile the many functions, classes, and functionalities available to MADARA users.

Users may also browse the Library Documentation for all MADARA functions, classes, etc. and the Wiki pages on this website.


Video Tutorials

First Steps in MADARA: Covers installation and a Hello World program.
Intro to Networking: Covers creating a multicast transport and coordination and image sharing between multiple agents.


C++ Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads | Optimizations | KaRL | Encryption | Checkpointing | Knowledge Performance | Logging