Skip to content

API Reference

Jesus Ruiz edited this page Aug 5, 2013 · 102 revisions

Table of contents

Durable

durable.run(programs[, dbConnection[, port[, basePath]]])

Starts a set of programs.

var d = require('durable');   
// example starting two programs:
// one with 'mySequence' name and one with 'myFlowchart' name.
d.run({  
  mySequence: d.receive(... ,
  myFlowchart: d.flowchart(... ,
});

programs is a json object that contains the set of programs and their names as key value pairs.
dbConnection is the address of the database that will be use by the durable.js runtime. The default name is '\promisesDb'.
port is the port to be used to host the REST interface. The default is '5000'.
basePath is the prefix for the REST interface address. The default is '/'.
top

Promise

durable.promise(function[, name[, async[, context]]])

Creates a promise for the future execution of the callback passed.

var d = require('durable');   
...
// example creating a synchronous promise
d.promise(function(s) {
    s.newContent = '1';
    ...
    if (error) {
        throw 'error: reason';
    }
})
...
// example creating an asynchronous promise
d.promise(function(s, complete) {
    s.newContent = '1';
    ...
    if (error) {
        complete('error: reason');
    }
}, null, true)
...

function is the callback to be run. When the async flag is not specified, the signature is function(s[, context]), where s is the 'active' session. An error is indicated by returning false or throwing an exception. When the async flag is specified the signature is function(s, complete[, context]), where s is the 'active' session, complete is a function to be called when the function is complete. If there is an error use the error argument in the complete function to indicate the reason. context is the object optionally specified in the promise function. If there is an error in the callback, execution will restart from the previous checkpoint during the next dispatch iteration.
name is the resumption label used when a promise checkpoints inside the callback.
async is specified when the callback completes asynchronously by calling the 'complete' function.
context is an object to be passed when the callback is run.
returns the promise created.
top

durable.promise.continueWith(promise)

Chains the next promise to be run when the current completes.

var d = require('durable');   
...
// example of promise chaining using continueWith
// and cascading using promise shortcuts.
d.promise(function(s) {
    s.firstContent = 1;
})
.continueWith(function(s) {
    s.secondContent = 2;
})
.checkpoint('first');
...

promise is the promise to be chained. When this argument is a function, a new promise will be created automatically.
returns the promise that has been chained.
top

Shortcuts

durable.promise.checkpoint(name)
Shortcut to create and chain the durable.checkpoint promise.
returns the new promise that has been created and chained.

durable.promise.idle()
Shortcut to create and chain the durable.idle promise.
returns the new promise that has been created and chained.

durable.promise.suspend()
Shortcut to create and chain the durable.suspend promise.
returns the new promise that has been created and chained.

durable.promise.delay(interval)
Shortcut to create and chain the durable.delay promise.
returns the new promise that has been created and chained.

durable.promise.mapReduce(flow, name)
Shortcut to create and chain the durable.mapReduce promise.
returns the new promise that has been created and chained.

durable.promise.receive(query[, projection])
Shortcut to create and chain the durable.receive promise.
returns the new promise that has been created and chained.

durable.promise.post()
Shortcut to create and chain the durable.post promise.
returns the new promise that has been created and chained.

durable.promise.startTimer(interval, name)
Shortcut to create and chain the durable.startTimer promise.
returns the new promise that has been created and chained.

durable.promise.waitAllEvents(events)
Shortcut to create and chain the durable.waitAllEvents promise.
returns the new promise that has been created and chained.

durable.promise.waitAnyEvent(events)
Shortcut to create and chain the durable.waitAnyEvent promise.
returns the new promise that has been created and chained.

durable.promise.waitAllStreams(streams, name)
Shortcut to create and chain the durable.waitAllStreams promise.
returns the new promise that has been created and chained.

durable.promise.waitAnyStream(streams, name)
Shortcut to create and chain the durable.waitAnyStream promise.
returns the new promise that has been created and chained.
top

Session

durable.checkpoint(name[, continue])

Checkpoints the current 'active' session. A new session record is inserted in the session records collection. All side effects, such as removing messages from the session messages collection and propagating messages to other session collections will happen at this point. If there is an error in subsequent promises, execution will restart from this point. After the checkpoint, by default the session will be put in idle state, the durable.js runtime will wait for the next message sent to this session before dispatching it again.

var d = require('durable');   
...
// example of checkpoint usage
d.receive()
.checkpoint('first')
.continueWith(function(s) {
    s.content = 1;
})
.checkpoint('second');

// example of session history after 'first' checkpoint
// GET http://example.com/checkpoint/1/history
[{
    label: 'first',
    status: 'idle'
}]

// example of session history after 'second' checkpoint
// GET http://example.com/checkpoint/1/history
[{
    label: 'first',
    status: 'checked'
},
{
    content: 1,
    label: 'second',
    status: 'idle'
}]

name is the label to be used for this checkpoint. continue is an optional argument to override idling after checkpoint.
returns the 'checkpoint' promise created.
top

durable.idle()

Yields control to durable.js runtime. The session status is marked as 'idle'. The durable.js runtime will wait for the next message sent to this session before dispatching it again.
returns the 'idle' promise created.
top

durable.suspend([reason])

Suspends the current 'active' session. After running this promise, the durable.js runtime will not dispatch to this session again.
reason is the detail for the suspension.
returns the 'suspend' promise created.
top

durable.session.getInput()

Gets the transient state set by the preceding promise as input. For example durable.post relies on this mechanism for getting messages to be sent.
returns the state object.
top

durable.session.setInput(value)

Sets the transient input state to be used by the immediate subsequent promise.
value is the object to be used.
top

durable.session.getOutput()

Gets the transient state set by the preceding promise as output. For example durable.receive relies on this mechanism for passing available messages to subsequent promises.
returns the state object.
top

durable.session.setOutput(value)

Sets the transient output state to be used by the immediate subsequent promise.
value is the object to be used.
top

Messages

durable.delay(interval, name)

Delays the program execution for the specified interval. This promise automatically checkpoints, so it requires an name to specify the checkpoint label.

var d = require('durable');   
...
// example of delay usage
d.receive()
.continueWith(function(s) {
    s.date = new Date().toLocaleString();
})
.delay(10, 'delay')
.continueWith(function(s) {
    s.date = new Date().toLocaleString();
})
.checkpoint('end');

interval is the number of seconds to wait before continuing. name is the label used when checkpointing.
returns the new promise created.
top

durable.post()

Posts a message. A message is passed into this promise during execution by setting the session input transient state. A message is a json object. The program field specifies the program to send the message to. The current program is used by default. The session field specifies the program session to correlate the message to. The current session is used by default. All messages have an id, which has to be unique withing the correlated session. The effectiveDate field specifies the date and time after which the message should be dispatched. The default is the time when the message is posted. The expireDate field specifies the date and time after which the message should be removed from the collection. The default is one hour after the message is posted.

var d = require('durable');   
...
d.receive()
.continueWith(function (s) {                        
    s.setInput({ 
        program: 'ping', 
        session: 1, 
        id: 1, 
        content: 'a' 
    });							
})
.post()
.checkpoint();

returns the new promise created.
top

durable.receive([query[, projection]])

Receives a message correlated with the currently active session using the specified filter and transformed by the specified projection. Filters and projections are specified using the mongoDb query and projection syntax. The durable.receive promise will continue execution only when a message that matches the specified query is added to the collection. While query and projection are typically specified in the receive function, they can also be specified dynamically by setting the input transient state of the current session. The message received is set in the output transient state of the current session. The message will be removed from the collection when the next checkpoint is reached.

var d = require('durable');   
...
// example of receiving a message given a filter
d.receive({ content: { $regex: '[0-9]*' } })
.continueWith(function (s) {         
    s.messageContent = s.getOutput().content;               
})
.checkpoint('first');

// example of receiving a message specifying the filter dynamically
d.promise(function(s) {
    s.setInput({ query: { content: "go" } });
})
.receive()
.continueWith(function (s) {         
    s.messageContent = s.getOutput().content;               
})
.checkpoint('first');

query is the filter to be applied. The query has to conform to the mongoDb query syntax.
projection specifies the optional transformation of the message. The projection has to conform to the mongoDb projection syntax.
returns the new promise created.
top

durable.tryReceive([query[, projection]])

As opposed to durable.receive. This promise will continue the execution of the program even if no message matches the query, in which case the object { checked: false } will be set in the session output transient state. This promise is typically used in conjunction with durable.waitAnyEvent, durable.waitAllEvents or as a durable.statechart trigger.
query is the filter to be applied. The query has to conform to the mongoDb query syntax.
projection specifies the optional transformation of the message. The projection has to conform to the mongoDb projection syntax.
returns the new promise created.
top

durable.startTimer(interval, name)

Starts a timer to expire given an interval in seconds. The name specified in this promise can be used to query the timeout status. This promise is typically used for timing out events in durable.waitAnyEvent or timing out triggers in durable.statechart. See durable.tryTimer for a usage example. The timer is activated only when the next checkpoint is reached.
interval is the number of seconds to wait before timeout. name used for subsequent timer status query.
returns the new promise created.
top

durable.tryTimer(name)

Checks if the timer with the specified name has timed out, in which case the object { checked: true } will be set in the session output transient state. This promise is typically used for timing out events in durable.waitAnyEvent or timing out triggers in durable.statechart.

var d = require('durable');   
...
// example of timing out a message receive
d.receive()
.startTimer(30, 'messageTimeout')
.checkpoint('start')
.waitAnyEvent({
    a: durable.tryReceive({ content: 'go' }),
    b: durable.tryTimer('messageTimeout')
})
.continueWith(function (s) {
    if (s.getOutput().b) {
        s.content = 'timeout';
    }
    ...
 })
.checkpoint('exit');

name of the timer to query.
returns the new promise created.
top

Events

durable.tryCondition(function)

Used for trying a session condition in conjunction with other events. The signature function(s){} is required for the callback function. If the condition is true the function should return true otherwise false.
function is the callback that will be run for trying the condition.
returns the new promise created.
top

durable.waitAnyEvent(events)

Waits for any of the conditions specified to complete. The events object is a set of name value pairs. The event output will be set in the session transient state as a json object that follows the names specified in the events object. The events object can use the event algebra explained in the concepts page.

// example of waitForAnyEvent usage
d.waitForAnyEvent({
    a: tryReceive({ command: 'right' }),
    b: tryReceive({ command: 'left' })
})
.continueWith(function(s) {
    if (s.getOutput().a) {
       // go right
    }
    else {
       // go left
    }
})
.checkpoint('exit');

events is the set of named events to be tried.
returns the new promise created.
top

durable.waitAllEvents(events)

Waits for all the conditions specified to complete. The events object is a set of name value pairs. The event output will be set in the session transient state as a json object that follows the names specified in the events object. The events object can use the event algebra explained in the concepts page.

// example of waitForAllEvents usage
d.waitForAllEvents({
    a: tryReceive({ command: 'go' }),
    b: tryReceive({ argument: 'left' })
})
.continueWith(function(s) {
    // go left
})
.checkpoint('exit');

events is the set of named events to be tried.
returns the new promise created.
top

Streams

durable.mapReduce(flow, name)

Used to distribute work based on dynamic input. The flow object contains the map and reduce functions as well as a stream to run. The map function has the signature map(s) {} it takes the current session as input and is required to return an array of json objects representing the sub-sessions to create. Each sub-session is addressed using parentName.name[i], where name is the promise name and i is the index in the array returned by the map function. The run object is the stream to be run for each sub-session created. When all sub-sessions are complete, the reduce function will be called. The reduce function is expected to have the signature reduce(s[]){}, where the set of sub-sessions is passed as input and the function returns a single aggregate session object. Because creating sub-sessions is a side-effecting operation, this promise implicitly checkpoints, the promise name is used as the checkpoint label. See the dynamic distribution section in the concepts page for more information.
flow represents the mapReduce workflow, it is a json object with three fields: map is the map function, run is a stream and reduce is the reduce function.
name is the checkpoint resumption label as well as the prefix for addressing sub-sessions.
top

durable.map(function, stream, name)

Maps the current session based on the function passed. durable.map creates sub-sessions, which is a side-effecting operation, so this promise implicitly checkpoints using the promise name as label. This promise is typically used before coordinating the durable.tryReduce promise with durable.waitAnyEvent, durable.waitAllEvents or durable.statechart triggers.
function is a map function that has the signature map(s){}. It takes the current session as input and is expected to return an array with the mapped sub-sessions. After the map function is executed, a new sub-session will be created for each entry returned. The address for each sub-session has the form sessionName.name[i] where name is the promise name and i is the index corresponding to the entry in the array returned by the map function.
stream is the promise or set of chained promises to be scheduled for each sub-session created.
name is the checkpoint resumption label as well as the prefix for addressing sub-sessions.
top

durable.tryReduce(function, name)

Tries reducing the set of sub-sessions mapped under the name passed. If all the sub-sessions are not complete yet, the object { checked: true } will be set in the session transient output state and the program will continue. This promise is typically used to coordinate with other events in conjunction with durable.waitAnyEvent, durable.waitAllEvents or durable.statechart triggers.
function is a reduce function that has the signature reduce(s[]){}. It takes an array of sub-sessions and is expected to return a single aggregate session.
name is the name of the mapped sub-sessions.
top

durable.waitAnyStream(streams, name[, merge])

Starts all streams specified and waits for any of them to complete. This promise creates one session fork for each stream specified. Each session fork is a copy of the main session. The address for each session fork is determined parentSessionName.streamName. Because forking a session is a side effecting operation this promise will implicitly checkpoint. All streams will execute concurrently with independent sessions. When any stream is complete, it is joined using the merge policy provided or the system default. See the static distribution section in the concepts page for more information.

var d = require('durable');   
...
// example of waiting for any of two streams
// one named 'first' and one named 'second'
d.waitAnyStream({
    first: d.receive({ content: 'first' })
        .continueWith(function (s) {
            s.first = s.getOutput().content;
        })
        .checkpoint('firstExit'),
    second: d.receive({ content: 'second' })
        .continueWith(function (s) {
            s.second = s.getOutput().content;
        })
        .checkpoint('secondExit')            
    }, 'twoStreams')
.continueWith(function (s) {
    if (s.first) {
        // first completed
    }
    else {
        // second completed
    }
})
.checkpoint('mainExit');

streams is the set of named streams.
name is the label to be used for the resumption checkpoint.
merge specifies the merge policy. By default merge will take unique fields from the main and the fork session, fork session will win in case of field conflicts.
top

durable.waitAllStreams(streams, name[, merge])

Starts all streams specified and waits for all of them to complete. This promise creates one session fork for each stream specified. Each session fork is a copy of the main session. The address for each session fork is determined parentSessionName.streamName. Because forking a session is a side effecting operation this promise will implicitly checkpoint. All streams will execute concurrently with independent sessions. When all streams are complete, they are joined using the merge policy provided or the system default. See the static distribution section in the concepts page for more information.
streams is the set of named streams.
name is the label to be used for the resumption checkpoint.
merge specifies the merge policy. By default merge will take unique fields from the main and the fork session, fork session will win in case of field conflicts.
top

Graphs

durable.stateChart(chart)

Creates a state machine.

var d = require('durable');   
d.run({
    // state chart example showing the chart object elements.
    statechart: durable.stateChart({
        first: {
            transition: {
                whenAll: { 
                    a: d.tryReceive({ content: 'a' }),
                    b: d.tryReceive({ content: 'b' })
                },
                run: d.promise(function (s) {
                    s.content = 'a'; 
                }),
                to: 'end'
            }
        },
        end: {
        }
    })
});

chart is a json object, which contains a set of name-state pairs. Each state is a json object, which contains a set of name-transition pairs. Each transition is also a json object. A transition has a trigger defined by the field when for coordinating a single 'try' event or defined by the fields whenAny or whenAll for coordinating multiple 'try' events. In the whenAll or whenAny triggers the event algebra described in the concept page can also be used. A transition optionally has an action defined by the field named run, which should contain a promise or a set of chained promises. A transition optionally has a destination defined by the field named to and indicates the name of the state to enter after the trigger and the action are executed. A state can be reflexive having a transition with the same state as destination. If all the fields in a state are omitted the state is considered final. Every time a state is entered the session is checkpointed using the state name as label. A state can also have a nested states, aside from the set of transitions, a set of states can be defined using the field $chart. See the graphs section in the concepts page for more information.
top

durable.flowChart(chart)

Creates a flow chart.

var d = require('durable');   
d.run({  
    // flowchart example showing the chart object elements
    flow: d.flowChart({
        stage1: {
            run: durable.receive({ content: 'try' })
                .continueWith(function (s) {
                    s.content = s.getOutput().content;
                }),
            to: {
                stage1: function (s) { return s.content === 'stage1' },
                final: function (s) { return s.content === 'final' },
            }
        },
        final: {
        }
    })
});

chart is a json object, which contains a set of name-stage pairs. Each stage is also a json object. A stage has an action defined by the field run, which contains a promise or a set of chained promises. A stage can have a simple destination defined by the field to indicating the name of the stage to enter after running the action. Alternatively a stage can have a selection of destinations defined by a set of name-function pairs, where each name is the destination to enter if the corresponding function returns true when executing. A stage can be reflexive having the same stage as its destination. Every time a stage is entered the session is checkpointed using the stage name as label. See the graphs section in the concepts page for more information.
top

Clone this wiki locally