Skip to content

JavaWorkingWithThreads

James Edmondson edited this page Jul 7, 2018 · 4 revisions

Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads


MADARA Threads

The Java port for MADARA supports fast, portable user-threads that interact with each other over data and control planes. With the addition of the Threader class and the extensible BaseThread, users can now make intuitive applications that not only perform distributed collaboration and communication but can also take advantage of multiple cores on a local node to really speed up computation and performance.


Table of Contents


1. A Hello World Threading Example

To use the Threader class, you need to do the following things:

  1. Extend a BaseThread and implement the run method
  • Create a Knowledge Base
  • Create a Threader
  • Run threads
  • Wait for threads to finish

Sounds simple enough, right? It really is. Let's make a simple Hello World application that launches a thread that prints "Hello World!".

Hello World Application


import ai.madara.knowledge.KnowledgeBase;
import ai.madara.logger.GlobalLogger;
import ai.madara.logger.LogLevels;
import ai.madara.threads.BaseThread;
import ai.madara.threads.Threader;
import ai.madara.knowledge.containers.Integer;


// 1: Extend a Base_Thread
public class HelloWorld extends BaseThread
{
  public void run (void)
  {
    System.out.println("Hello World!");
  }
    
  public static void main(String...args) throws InterruptedException, Exception
  {
    // 2: Create a Knowledge Base
    KnowledgeBase knowledge = new KnowledgeBase();

    // 3: Create a Threader 
    Threader threader = new Threader(knowledge);

    // 4: Run threads
    threader.run("helloWorld", new HelloWorld());

    // 5: Wait for threads to finish
    threader.wait();
    
    threader.free();
    knowledge.free();
  }
}

Believe it or not, you have just created a multi-threaded application with MADARA!


2. Understanding BaseThread

com.madara.threads.BaseThread is a simple class with some really powerful features. We only showed part of what's possible with the Hello World example. Next, we'll show how you can make your threads work together to accomplish a simple task, but first, we need to go over the features of the BaseThread and exactly what you're inheriting.

BaseThread has three total virtual functions, but two of them have default implementations, so you don't have to implement them unless you want to. The run method is the only method that is required, and that's what we used in Hello World to print the message to the screen.

The other two methods are init and cleanup, and their signatures are outlined in the following code segment (taken from the Base_Thread header file):

/**
 * Initializes thread with MADARA context
 * @param   context   context for querying current program state
 **/
 void init(ai.madara.knowledge.KnowledgeBase context)
 {
 }

/**
 * Cleans up any thread residue (usually instances created in init)
 **/
 void cleanup(void)
 {
 }

The method init method is called before run and allows you to save or query the context for information or create any kind of MADARA containers that you might be interested in for your run function.

The method cleanup is called after run and before the destruction of the thread's internals. If you do some kind of unmanaged heap-based memory allocation, then you may want to override this function. However, if all you're using are MADARA containers, you probably don't need to use this function at all.

In addition to the init, run, and cleanup methods are an accessor method and two variables that tell the user what the thread name is (i.e., the "getName" method) and allow direct messaging between the Threader class and the threads to let them know about user intentions. The paused and terminated variables are instances of Madara Integer Containers that are used by the Threader class to let each thread know when it is passing along a request to pause or terminate the thread.

To check on the status of these control plane flags between the Threader and your thread, simply check it as you would any other variable. Let's take a look at an example that features a nagging hello class that prints hello world until the user asks it to terminate.

Nagging Hello World Application


import ai.madara.knowledge.KnowledgeBase;
import ai.madara.logger.GlobalLogger;
import ai.madara.logger.LogLevels;
import ai.madara.threads.BaseThread;
import ai.madara.threads.Threader;
import ai.madara.knowledge.containers.Integer;

// A nagging thread that just keeps saying hello until terminated
public class NaggingHelloWorld extends BaseThread
{
  public void run (void)
  {
    while (terminated.get() == 0)
    {
      System.out.println ("Hello World from " << getName() << " thread!\n";
    }
  }
    
  public static void main(String...args) throws InterruptedException, Exception
  {
    KnowledgeBase knowledge = new KnowledgeBase();
    
    Threader threader = new Threader(knowledge);

    threader.run("helloWorld", new NaggingHelloWorld());

    // sleep for 10 seconds
    java.lang.Thread.sleep(10000);
    
    // tell threads to terminate
    threader.terminate ();
  
    threader.wait();
    
    threader.free();
    knowledge.free();
  }
}

3. Understanding Threader

So, from the previous examples, you've seen that we can terminate all threads and wait for them to finish. But what if we only wanted to terminate one thread out of many? In the Nagging Hello World application, what if we had three threads, two of which were doing work we wanted done and only one of them that was actually nagging us? Let's see what it looks like to tell only the annoying one to finish.

Nagging Hello World Application

import ai.madara.knowledge.KnowledgeBase;
import ai.madara.logger.GlobalLogger;
import ai.madara.logger.LogLevels;
import ai.madara.threads.BaseThread;
import ai.madara.threads.Threader;
import ai.madara.knowledge.containers.Integer;

// a counter thread that works with other counters
public class CounterThread extends BaseThread
{
  // CounterThread has a couple of data members it needs initialized
  Integer counter;
  KnowledgeBase data;
  
  // A nagging thread that just keeps saying hello until terminated
  private class NaggingHelloWorld extends BaseThread
  {
    KnowledgeBase knowledge;
  
    // the NaggingHelloWorld's initialize
    void init (KnowledgeBase data)
    {
      knowledge = data;
    }
    
    // the NaggingHelloWorld's run method
    public void run (void)
    {
      while (terminated.get() == 0)
      {
        System.out.println ("Hello World from " << getName() << " thread!\n";
      }
    }
    
    // the NaggingHelloWorld's cleanup method
    void cleanup (void)
    {
      knowledge.print ("Nagging thread is shutting down.\n");
    }
  }
  
  // CounterThread's initialization method
  public void init (KnowledgeBase context)
  {
    // reference a counter variable in the context
    counter.setName (context, "counter");

    // save the context
    data = context;
  }

  // CounterThread's run method
  public void run (void)
  {
    long value = 0;
    
    // count until we reach 100K or until the user terminates us
    while (value < 100000 && terminated.get() == 0)
    {
      // increment the counter
      long value = counter.inc();
      
      // print the counter's value to the console
      System.out.println ("In thread " + getName() + ". Count increased to " + value);
    }
  }

  // CounterThread's cleanup method
  public void cleanup (void)
  {
    data.print ("Counter thread is shutting down.\n");
  }
  
  // CounterThread's main method
  public static void main(String...args) throws InterruptedException, Exception
  {
    KnowledgeBase knowledge = new KnowledgeBase();
    
    Threader threader = new Threader(knowledge);

    // run a nagger thread
    threader.run("nagger", new NaggingHelloWorld());

    // run the two counter threads
    threader.run ("counter1", new CounterThread ());
    threader.run ("counter2", new CounterThread ());

    // sleep for 3 seconds to allow thread to nag us for a bit
    java.lang.Thread.sleep(3000);
    
    // tell nagger thread to terminate
    threader.terminate ("nagger");
  
    // wait for all threads to finish
    threader.wait();
    
    threader.free();
    knowledge.free();
  }
}

4. Controlling Threader executions

The examples up to this point have focused on unregulated threads doing whatever executions they can. In threading terms, we call this kind of activity bursting. The thread will devour whatever CPU time it can until the thread meets its conditions for terminating.

But that's not the only mode of threading available in MADARA.

An alternative method for controlling threads is to allow periodic execution of each thread. We cover periodic thread execution in the following example.

Instead of having two uncontrolled counters, we will instead create two controlled counters operating at 20hz (20 executions per second) and 40hz each. Creating extensions of the Base_Thread for this type of controlled threading paradigm usually simplifies threading significantly, as the Threader class and associated worker threads will check the paused and terminated flags for you, in between executions. However, as you will see in the example, you can set the terminated flags yourself if it's appropriate.

There are two main development changes required for using periodic threads rather than bursty threads. First, we need to implement a run method that does not continuously loop, but instead only does some useful bit of work one time. Second, we need to call the Threader::run method that takes in a hertz parameter for controlling the thread execution.

Here's how to accomplish this:

Periodic Counter Thread application


import ai.madara.knowledge.KnowledgeBase;
import ai.madara.logger.GlobalLogger;
import ai.madara.logger.LogLevels;
import ai.madara.threads.BaseThread;
import ai.madara.threads.Threader;
import ai.madara.knowledge.containers.Integer;

// a counter thread that works with other counters
public class CounterThread extends BaseThread
{
  // CounterThread has a couple of data members it needs initialized
  Integer counter;
  KnowledgeBase data;
  
  
  // CounterThread's initialization method
  public void init (KnowledgeBase context)
  {
    // reference a counter variable in the context
    counter.setName (context, "counter");

    // save the context
    data = context;
  }

  // CounterThread's run method. Because we will run periodically, we do not need a while loop
  public void run (void)
  {
    // if the counter is greater than 1,000, then set terminated to true to cancel periodic execution
    if (counter.get() > 1000)
    {
      terminated.set(1);
    }
    else
    {
      ++counter;

      System.out.println ("In thread " + getName() + ". Count increased to " + counter.get());
    }
  }

  // CounterThread's cleanup method
  public void cleanup (void)
  {
    data.print ("Counter thread is shutting down.\n");
  }
  
  public static void main(String...args) throws InterruptedException, Exception
  {
    KnowledgeBase knowledge = new KnowledgeBase();
    
    Threader threader = new Threader(knowledge);

    // run the two counter threads, one at 20hz and one at 40hz
    threader.run (20.0, "counter1", new CounterThread ());
    threader.run (40.0, "counter2", new CounterThread ());

    // wait for all threads to finish
    threader.wait();
    
    threader.free();
    knowledge.free();
  }
}

5. Using Thread-Safe Queues

A common paradigm to use for parallel programming is to create a queue between threads, the most frequent of which is generally a solution to the producer/consumer problem. The central idea to the producer/consumer concept is that a producer creates information for the consumers to consume and utilize. A queue encapsulates this interaction intuitively, and in this section, we'll cover how to setup your own producer/consumer application quickly and easily with MADARA.

We'll continue to build on the knowledge you've gained over the past few examples. In the following producer/consumer example, a producer thread creates new actions for consumers to perform based on a random number generator included in the Madara::Utility package.

Producer/Consumer Application Example

import ai.madara.knowledge.KnowledgeBase;
import ai.madara.knowledge.KnowledgeRecord;
import ai.madara.threads.BaseThread;
import ai.madara.threads.Threader;
import ai.madara.knowledge.containers.Queue;
import ai.madara.knowledge.containers.Integer;

import java.util.Random;
import java.lang.System;

// a counter thread that works with other counters
public class ProducerConsumerQueueTest
{
  static class Producer extends BaseThread
  {
    // Producer's initialization method
    public void init(KnowledgeBase context)
    {
      // create a random generator
      generator = new Random(System.currentTimeMillis());
      
      // create the job queue
      queue = new Queue();
      queue.setName(context, ".jobs");
      queue.resize(-1);
    }

    // Producer's run method.
    public void run()
    {
      // generate a new job until terminated
      long job = (long)generator.nextInt(4);
      queue.enqueue(job);
    }
    
    // Producer's cleanup method
    public void cleanup()
    {
      queue.free();
    }
    
    Random generator;
    Queue queue;
  }

  static class Consumer extends BaseThread
  {
    // Consumer's initialization method
    public void init(KnowledgeBase context)
    {
      // create a reference to the jobs queue
      queue = new Queue();
      queue.setName(context, ".jobs");
      queue.resize(-1);
      
      // keep track of the number of jobs completed
      jobsCompleted = new Integer();
      jobsCompleted.setName(context, ".jobsCompleted");
    }

    // Consumer's run method.
    public void run()
    {
      KnowledgeRecord job = queue.dequeue(false);
      
      if(job.isValid())
      {
        long value = job.toLong();
      
        if(value == 0)
        {
          System.out.println(jobsCompleted.get() + ": Checking News");
        }
        else if(value == 1)
        {
          System.out.println(jobsCompleted.get() + ": Checking Stocks");
        }
        else if(value == 2)
        {
          System.out.println(jobsCompleted.get() + ": Checking Email");
        }
        else if(value == 3)
        {
          System.out.println(jobsCompleted.get() + ": Checking Schedule");
        }
        else
        {
          System.out.println(jobsCompleted.get() + ": Unknown Job Type");
        }
        
        jobsCompleted.inc();
      }
      
      // clean up the removed job
      job.free();
    }
    
    // Consumer's cleanup method
    public void cleanup()
    {
      // clean up the private copies of our queue and counter
      queue.free();
      jobsCompleted.free();
    }
    
    Queue queue;
    Integer jobsCompleted;
  }

  
  public static void main(String...args) throws InterruptedException, Exception
  {
    KnowledgeBase knowledge = new KnowledgeBase();
    
    Threader threader = new Threader(knowledge);
    Integer jobsCompleted = new Integer();
    jobsCompleted.setName(knowledge, ".jobsCompleted");
    
    Queue jobs = new Queue();
    jobs.setName(knowledge, ".jobs");
    jobs.resize(100);
    
    // run the two counter threads, one at 20hz and one at 40hz
    threader.run(20.0, "producer", new Producer());
    threader.run(40.0, "consumer", new Consumer());

    while(jobsCompleted.get() < 100)
    {
      java.lang.Thread.sleep(1000);
    }
    
    // terminate all threads
    threader.terminate();
    
    // wait for all threads to finish
    threader.waitForThreads();
    
    threader.free();
    knowledge.free();
  }
}

You may have noticed that the jobs queue created in the above example uses a local variable prefix (".jobs") to keep the queue local. This is important and it's also a powerful concept to learn about the Queue container.

Why is this important? Queue is thread-safe but not necessarily process-safe. If you have multiple consumers/producers across nodes in your network, the timing of producing and consuming is likely to produce race conditions due to the way consistency is enforced within the MADARA engine.

For instance, node1 may have a producer thread and node2 may have a producer thread. If both of them enqueue a job at the same time, each will place a task on the queue simultaneously. Whoever writes last to this index in the queue will end up having its item in the queue, and due to network latency, it is very possible that a queue between nodes would have inconsistent production/consumption ordering. In many cases, this may be fine, but for most applications this is likely to be a debugging nightmare.

Each node can have multiple producers and consumers acting independently of other producers and consumers on other nodes because it is using a local job queue between threads on the node. This is the recommended way to use the thread-safe Queue container. The only recommended way to use a Queue container for distributed applications (i.e., not only multithreaded but multiprocessed) is to create a queue between each producer and consumer (a direct channel between them), so that each queue is representing a 1:1 relationship between producer and consumer. That should work fine.

In conclusion, a couple of points to keep in mind about the Queue container.

  • It is very useful for multithreaded applications
  • It is rarely useful for distributed applications. Be careful with how you use Queue in distributed applications.

More Information

The Java module is fully documented with standard java documentation. Please use the API at JavaDocs or tell your editor of choice to load source files from the Git repository.


Java Guide Series
Architecture | Knowledge Base | Networking | Containers | Threads