Skip to content

Concepts

Jesus Ruiz edited this page Aug 13, 2013 · 27 revisions

durable.js is designed for writing programs that last long periods of time and react to sporadic external events. This is a very common pattern in enterprise applications that coordinate interactions with external services or require human intervention.

durable.js is based on the json type system through and through: it exposes a REST interface for posting event messages as json objects, it stores the program state in a document database (MongoDB) as json objects and the program itself is defined as a json object. The result is a simple, intuitive and consistent meta-linguistic abstraction as well as optimal system performance because resources are not wasted in type system mappings. durable.js stores the program state as a collection of historical records in a document database. Historical records enable aggregating and analyzing program data without degrading the system performance. Incoming events are stored as durable messages in a collection in the same database. Messages are correlated with and dispatched to the program event handlers following a similar callback model to that of Node.js.

Worth mentioning is the use of Node.js as the program hosting environment. Consistent with Node.js, in a given process only one callback is dispatched at a time. Because of the nature of long running programs durable.js spends a good deal of time doing I/O. Node.js 'cluster' module can be used to scale out the system allowing good resource utilization and hosting program instances at very high density. durable.js provides a management web interface based on svg and driven by d3.js. The user interface provides visualization of program structure as well as historical record overlays. This enables the analysis of code execution sequences, provides an intuitive set of actions for testing and repairing program instances.

Table of contents

Session

Now let's consider the following simple code example, the program named 'sequence' with a collection of messages and a collection of session records:

var d = require('durable');   
d.run({  
  sequence: d.receive({ content: 'first' })  
    .continueWith(function (s) { s.firstContinue = true; })  
    .checkpoint('first')
    .receive({ content: 'second' })
    .continueWith(function (s) { s.secondContinue = true; })  
    .checkpoint('second')
}); 

See this example in [action] (http://www.durablejs.org/examples/sequence/1/admin.html).

A session is the program state. It is a json object, which definition and content are owned by the program. The 'label', 'status' and 'id' fields in any session object are reserved for system use. A session is an addressable object. For example, assuming the program above is hosted in the 'example.com' site, the session named '1' can be retrieved as follows:

GET: http://example.com/sequence/1
{ 
    firstContinue: true, 
    secondContinue: true, 
    label: 'second',
    status: 'complete'
}

All sessions of a program can be queried, filtered and projected. I explain the details in the Web interface section. Messages are sent to a session and are received inside a session context. When a message is sent to a session with an id that doesn't exist, a new session is created using such an id as name. The session state above was created when posting the message:

POST: http://example.com/sequence/1
{ 
    id: 1,
    content: 'first'
}

Every session has a historical set of records. That is, when a session reaches a checkpoint, the old session state is archived and the new state is stored as idle. Thus session records are always inserted, never updated. This allows for optimal update and query concurrency. The history for the session named '1' can be retrieved as follows:

GET: http://example.com/sequence/1/history
[{
    status: 'checked'
}, { 
    firstContinue: true, 
    label: 'first',
    status: 'checked'
}, { 
    firstContinue: true, 
    secondContinue: true, 
    label: 'second',
    status: 'complete'
}]

The session history can be queried, filtered and projected. I explain the details in the Web interface section. As I mentioned before, the 'status' field in the session state is reserved for system use. As session is processed it can have the 'idle' status when waiting for an event to occur, 'active' when dispatched into the program logic or 'complete' when the program is done. The 'active' session object is passed to all the relevant program event handlers, in the code fragment above it is passed to the function(s) { ... } handlers.

Finally a session can be forked to create a child session and a child session can be joined with its parent session using a merge function. This enables concurrent streams, which I explain in detail further down.
top

Promise

A promise is the unit of execution. A program is a set of chained promises. A promise performs work on a session. A promise typically transforms the session state, but it can also receive and post messages, checkpoint, fork and join child sessions. A promise can be initialized passing a callback function, the callback function can have the session object as a parameter:

d.promise(function(s) { s.i = 1; });

Promises are chained using the continueWith function, which supports cascading to improve the readability of the program. The promise object has a continueWith override method, which takes a callback as a parameter and automatically chains a new promise.

d.promise(function(s) { s.i = 1; })
.continueWith(function(s) { s.i = 2; });

Promises are fault tolerant, that is, when an exception occurs inside a promise the program will be resumed using the last stored session record. This model requires the code inside a promise to be idempotent. Side effecting operations such as message receive and post as well as session fork and join will only be executed when the session successfully reaches a checkpoint.

Reusable promises can be created by 'sub-classing' the promise object. This is an advanced scenario for cases where you want to extend existing functionality. Note that durable.js relies on functional inheritance rather than prototypal inheritance, as the former provides better encapsulation of private members.

var myPromise = function () {
    var execute = function (s) {            
    };
    var that = d.promise(execute);
    return that;
}

The mechanism above is used in durable.js to create the meta-linguistic abstraction. In order to facilitate the programming experience all the relevant functions have been hoisted from the main promise function: checkpoint, idle, suspend, delay, receive, post, etc...
top

Messages

Message passing is the basic mechanism for coordinating events. Every program has a collection of messages, which are correlated with 'idle' sessions and dispatched to the program. Messages have a unique identifier, which is used to ensure the post method consistency contract. Messages are addressed to a session of a program. If the addressed session doesn't exist, a new session is created. Messages can have effectiveDate, which specifies the date after which a message should be dispatched. By default effectiveDate is now. Messages can have expireDate, which specifies the date after which a message should be deleted from the messages collection. By default a message is deleted after an hour of being posted. Below is an example of posting the message with id '1' to the session named '1' of the program 'ping':

d.promise(function (s) {                        
    s.setInput({ 
        program: 'ping', 
        session: 1, 
        id: 1, 
        content: 'a' 
    });							
})
.post()
.checkpoint('exit');

Note that the session object holds transient state, that is, state not stored when the session reaches a checkpoint. This allows passing context from a promise down to the immediate chained promise. The setInput method sets the context for the next promise, while the getOutput method gets the context from the previous promise. The code above posts a message within the program. Messages can also be posted from the outside using the Web interface, which I explain in detail further down:

POST: http://example.com/ping/1
{ 
    id: 1,
    content: 'a'
}  

When a session is 'active', receiving a message means executing a query, and optionally a projection over the program message collection. If no messages match the query, the program control is yielded and the session marked as 'idle'. durable.js leverages [mongoDb] (http://docs.mongodb.org/manual/core/read/) query and projection language, this allows enormous flexibility in message coordination. The example below receives the first message queued that has a field 'command' with content 'go'. Note how the object { command: 'go' } is a mongo query. Remember a matched message won't be removed from the collection until the session successfully reaches a 'checkpoint'.

d.receive({ command: 'go' })
.continueWith(function(s) {
    s.out = s.getOutput(); 
})
.checkpoint('exit');

top

Event algebra

durable.js implements an event algebra for coordinating complex event sequences. Every event promise has two versions: one with the event name and another one with the name prefixed by 'try'. For example receive and tryReceive. While the former prevents the program from advancing until the query is satisfied, the latter tries and continues immediately so a subsequent promise makes the decision on how to proceed. The 'try' versions of events can be combined using the waitForAllEvents and waitForAnyEvent promises. The example below receives two messages one with a field named 'command' with content 'first' and another one with a field named 'argument' with content 'second':

d.waitAllEvents({
    a: d.tryReceive({ command: 'go' }),
    b: d.tryReceive({ argument: 'next' })
})
.continueWith(function(s) {
    s.firstResult = s.getOutput().a.command;
    s.secondResult = s.getOutput().b.argument;
})
.checkpoint('exit');

Note how the waitForAllEvents promise takes a json object as argument. In this object you can provide more sophisticated event coordination expressions by using the $and and $or operators. This example receives a message with a field named 'command' with content 'end' or two messages, one with a 'command' field with content 'go' and another one with an 'argument' field with content 'next':

d.waitAnyEvent({
    a: d.tryReceive({ command: 'end' }),
    b$and: {
        a: d.tryReceive({ command: 'go' }),
        b: d.tryReceive({ argument: 'next' })
    }
})
.continueWith(function(s) {
    s.command = (s.getOutput().b ? s.getOutput().b.a.command : s.getOutput().a.command);        
})
.checkpoint('exit');

See this example in [action] (http://www.durablejs.org/examples/algebra/1/admin.html).

In the two examples above you might have noticed how the event output is packaged. As durable.js evaluates the event expression it places the results in a json tree that follows the names provided in the wait promise argument. Aside from message receive, you can combine other events such as stream completion and timers.
top

Static distribution

Streams are used for coordinating the execution distribution of the program. The waitForAllStreams and waitForAnyStream promises are used for starting concurrent stream execution and waiting for them to complete. To start a new stream the session context under which it is created is forked. Forking a session means copying all its contents and assigning the child session a name derived from that of its parent parentid.childid. When a stream is complete, its session can be joined with that of its parent. Both sessions will be merged taking the unique fields of each session, in case of conflict the child session wins. The join merge policy can be customized by passing a callback. In the example that follows two streams are created, one with the sessionid.first name and another one with the sessionid.second name:

var d = require('durable');   
d.run({  
    streams: d.waitAllStreams({
        first: d.receive({ content: 'first' })
            .continueWith(function (s) {
                s.first = s.getOutput().content;
                })
            .checkpoint('firstStream'),
        second: d.receive({ content: 'second' })
            .continueWith(function (s) {
                s.second = s.getOutput().content;
            })
            .checkpoint('secondStream')            
        }, 'twoStreams')
    .continueWith(function (s) {
        s.result = 'two streams completed';
    })
    .checkpoint('exit');
});

See this example in [action] (http://www.durablejs.org/examples/streams/parent/admin.html).

To complete the program above two messages need to be posted, one for each child stream. Messages can be posted in any order and the execution of each stream will continue independently and concurrently as messages are received. Depending on the physical distribution of the program each stream can execute in different processes or different machines.

POST: http://example.com/streams/parent.first
{ 
    id: 1,
    content: 'first'
}

POST: http://example.com/streams/parent.second
{ 
    id: 2,
    content: 'second'
}

When the program above is done, there is one parent and two child sessions in the 'complete' state:

GET: http://example.com/streams/parent
{ 
    result: 'two streams complete', 
    first: 'first',
    second: 'second',
    label: 'exit',
    status: 'complete'
}

GET: http://example.com/streams/parent.first
{ 
    first: 'first', 
    label: 'firstStream',
    status: 'complete'
}

GET: http://example.com/streams/parent.second
{ 
    second: 'second',
    label: 'secondStream',
    status: 'complete'
}

top

Dynamic distribution

In the Static distribution pattern the number of parallel branches is known when writing the program. There are other cases where the number of parallel branches is determined by program data. durable.js implements a map-reduce flow for such cases. The example below maps the first message to as many child sessions as defined in the 'content' field of the message, each child session waits for a message to be sent and computes the resulting content, finally the reduce function will take all the child sessions computing the total output:

d.receive({ content: { $regex: '[0-9]*' } })
.mapReduce({
    map: function(s) { 
            var count = s.getOutput();
            var mappedSessions = [];
            for (var i = 0; i < count; ++i) {
                mappedSessions[i] = { content: i };
            }
            return mappedSessions;
        },
    run: d.receive({ content: { $regex: '[0-9]*' } })
        .continueWith(function(s) { 
            s.content = s.content + s.getOutput(); 
        })
        .checkpoint('done'),
    reduce: function(mappedSessions) {
            var total = 0;        
            for (var i = 0; i < mappedSessions.length; ++i) {
                total = total + mappedSessions[i].content;
            }
            return { content: total };
        }
    }, 'map')
.checkpoint('exit');

The program above can be completed with the following three messages. Note how child sessions need to be addressed using the [0] and [1] sub-indexes in the id.

POST: http://example.com/mapreduce/parent
{ 
    id: 1,
    content: 2
}

POST: http://example.com/mapreduce/parent.map[0]
{ 
    id: 2,
    content: 5
}

POST: http://example.com/mapreduce/parent.map[1]
{ 
    id: 3,
    content: 5
}

When the program is done, there is one parent session with the content aggregated by the reduce function and two child sessions with their own computed content.

GET: http://example.com/mapreduce/parent
{ 
    content: '11'
    label: 'exit',
    status: 'complete'
}

GET: http://example.com/mapreduce/parent.map[0]
{ 
    content: '5', 
    label: 'done',
    status: 'complete'
}

GET: http://example.com/mapreduce/parent.map[1]
{ 
    content: '6',
    label: 'done',
    status: 'complete'
}

top

Graphs

To help organizing the program structure, durable.js offers two kinds of graphs: flowchart and statechart. In a flowchart every time a node is entered the session is automatically checkpointed and its promise sequence is executed. Edges point to the next node to enter. A flowchart can have cycles. Switch is used to decide on the next node based on input data. The example below has three nodes ('stage0', 'stage1' and 'final') and one switch. The program starts in 'stage0' waiting for a message with 'content' equal to 'a'. After receiving it the program proceeds to 'stage1', where it will wait for a message with 'content' equal to 'b' or 'content' equal to 'c'. A switch is used in 'stage1' to decide the next stage: is the session 'content' field is equal to 'b' the program will go back to 'stage0' otherwise it will proceed to the 'final' stage:

var d = require('durable');   
d.run({  
    flow: d.flowChart({
        stage0: {
            run: d.receive({ content: 'a' })
                .continueWith(function (s) { 
                    s.content = 'a' 
                }),
            to: 'stage1'
        },
        stage1: {
            run: durable.receive({ $or: [{ content: 'b' }, { content: 'c' }] })
                .continueWith(function (s) {
                    s.content = s.getOutput().content;
                }),
            to: {
                stage2: function (s) { return s.content === 'b' },
                final: function (s) { return s.content === 'c' }
            }
        },
	stage2: {
            run: d.receive({ content: 'd' })
                .continueWith(function (s) { 
                    s.content = 'd' 
                }),
            to: 'stage1'
        },
        final: {
        }
    })
});

See this example in [action] (http://www.durablejs.org/examples/flowchart/1/admin.html).

In a statechart each node is a state, a continuation point and the node edges represent transitions. Each transition is triggered by an event and as a result executes an action before moving to the next state. The session is automatically checkpointed when reaching a new state. The example below has three states: 'first', 'second' and 'end'. The transition from the 'first' to 'second' state is triggered when a message with a field 'content' equal to 'a' is posted to the session. The transition from the 'first' state to the last (named 'end') will happen when a message with a field 'content' equal to 'b' is posted to the session.

var d = require('durable');   
d.run({
    statechart: durable.stateChart({
        first: {
            firstTransition: {
                when: d.tryReceive({ content: 'a' }),
                run: d.promise(function (s) {
                    s.content = 'a'; 
                }),
                to: 'second'
            },
            endTransition: {
                when: d.tryReceive({ content: 'b' }),
                to: 'end'
            }
        },
        second: {
            endTransition: {
                when: d.tryReceive({ content: 'c' }),
                run: d.promise(function (s) { 
                    s.content = 'c';
                }),
                to: 'end'
            }
        },
        end: {
        }
    })
});

See this example in [action] (http://www.durablejs.org/examples/statechart/1/admin.html).

statechart transitions can also leverage event algebra by using the whenAll and whenAny directives. For instance: taking the case above, the 'endTransition' would be triggered if a message with 'content' 'c' was posted or two messages one with 'content' 'e' and one with 'content' 'f' were posted.

endTransition: {
    whenAny: {
        c: d.tryReceive({ content: 'c' }),
        d$and: {
            e: d.tryReceive({ content: 'e' }),
            f: d.tryReceive({ content: 'f' })
        }
    }
...

statechart can have nested states. durable.js implements the UML nested state semantics: If the program is in the nested substate, it also is in the surrounding superstate. This state chart will attempt to handle any event in the context of the substate, however, if the substate does not handle the event, the event is automatically handled at the higher level context of the 'superstate'. The example below will start in the 'ping' substate, if a message with a 'content' field 'pong' is posted, it transitions to the 'pong' substate. In addition if a with a 'content' field 'off' message is posted, it transitions to the 'off' superstate. When in the 'pong' substate, if a message with a 'content' filed 'reset' is posted, the state chart will transition to the 'on' state and 'ping' substate.

var d = require('durable');   
d.run({
    statechart: durable.stateChart({
        on: {
            $chart: {
                ping: {
                    toPong: {
                        when: d.tryReceive({ content: 'pong' }), 
                        run: d.promise(function (s) {
                            s.content = 'pong'; 
                        }),
                        to: 'pong'
                    }
                },
                pong: {
                    toPing: {
                        when: d.tryReceive({ content: 'ping' }), 
                        run: d.promise(function (s) {
                            s.content = 'ping'; 
                        }),
                        to: 'ping'
                    }
                },
            },     
            reset: {
                when: d.tryReceive({ content: 'reset' }),
                to: 'on'
            },
            turnOff: {
                when: d.tryReceive({ content: 'off' }),
                to: 'off'
            }
        },
        off: {
        }
    })
});

See this example in [action] (http://www.durablejs.org/examples/nestedchart/1/admin.html).

top

Web interface

durable.js provides a simple web interface for every program run. The web interface consists of two parts: a REST API and a visualization page. To explore the available functionality let's consider the following program and let's assume it has been hosted in the following site 'example.com':

var d = require('durable');
d.run({
  sequence: d.receive({ content: 'first' })
    .continueWith(function (s) { s.firstContinue = true; })
    .checkpoint('first')
});

You can get the program definition by issuing an http GET on the program name (in the case above 'sequence'). Note how the program definition is not exactly what was written, but its representation as a json object.

GET: http://example.com/sequence
[{
    'type':'checkpoint',
    'params': { 'name':'first' }
}, {
    'type':'promise',
    'params': {
        'func' : 'function (s) { s.firstContinue = true; }',
        'name' : null,
        'async' : false,
        'context' : null
    }
}, {
    'type' : 'receive',
    'params' : { 
        'query' : { 'content' : 'first'},
        'projection' : null
     }
}]

The list of sessions associated with a program can be retrieved by issuing a GET request over the /program/sessions url, in the case of the code above /sequence/sessions. You can apply filters or page through the results by using $filter=[{query}], $top=[number of records], $skip=[number of records].

GET: http://example.com/sequence/sessions?$filter={"label":"first"}
[{ 
    id: 1,
    firstContinue: true, 
    label: 'first',
    status: 'complete'
}, {
    id: 2,
    firstContinue: true, 
    label: 'first',
    status: 'complete'
}]

If you know the session id, you can retrieve its current status by requesting an http GET on the /program/sessionid url. For the code example above /sequence/1:

GET: http://example.com/sequence/1
{ 
    firstContinue: true, 
    label: 'first',
    status: 'complete'
}

All the historical records associated with a session can be retrieved by issuing an http GET the /program/sessionid/history url. You can apply filters or page through the results by using $filter=[{query}], $top=[number of records], $skip=[number of records].

GET: http://example.com/sequence/1/history$top=2
[{
    status: 'checked'
}, { 
    firstContinue: true, 
    label: 'first',
    status: 'complete'
}]

You can raise events from outside the program by issuing an http POST on a session, addressed by the url /program/sessionid, in the case above /sequence/1.

POST: http://example.com/sequence/1
{ 
    id: 1,
    content: 'first'
}

It is also possible to modify the current state of a session by issuing a PATCH request on the /program/sessionid url. Remember records are not updated but 'checkpointed' and inserted, in the case of PATCH the current record will be 'checkpointed' and a new record with status equal to 'idle' and the new state will be added. This way you can move the program forward, backward, or start from any arbitrary point.

PATCH: http://example.com/sequence/1
{        
}

GET: http://example.com/sequence/1/history
[{
    status: 'checked'
}, { 
    firstContinue: true, 
    label: 'first',
    status: 'checked'
}, { 
    status: 'idle'
}]

Finally every program has an admin.html user interface, which displays a graphical representation of the program overlaid with the session historical data. The user interface let's you drill into the parameter definition of each promise. Also for a given session you can analyze how the program progressed over time. In addition a scratch pad lets you post messages to the session and patch a session as described above.
top

####Scale

var d = require('durable');
var cluster = require('cluster');

if (cluster.isMaster) {
  for (var i = 0; i < 4; i++) {
    cluster.fork();
  }

  cluster.on("exit", function(worker, code, signal) {
    cluster.fork();
  });
}
else {
    d.run({
       // Your program definitions
    }, 'yourDbConnection', process.env.PORT);
}

top