Skip to content

pancsta/asyncmachine-go

Repository files navigation

asyncmachine-go

TUI Debugger

asyncmachine-go is a minimal implementation of AsyncMachine in Golang using channels and context. It aims at simplicity and speed.

It can be used as a lightweight in-memory Temporal alternative, worker for Asynq, or to create simple consensus engines, stateful firewalls, telemetry, bots, etc.

asyncmachine-go is a general purpose state machine for managing complex asynchronous workflows in a safe and structured way.

See am-dbg's states structure and relations
var States = am.Struct{

    ///// Input events

    ClientMsg:       {Multi: true},
    ConnectEvent:    {Multi: true},
    DisconnectEvent: {Multi: true},

    // user scrolling tx / steps
    UserFwd: {
        Add:    S{Fwd},
        Remove: GroupPlaying,
    },
    UserBack: {
        Add:    S{Back},
        Remove: GroupPlaying,
    },
    UserFwdStep: {
        Add:     S{FwdStep},
        Require: S{ClientSelected},
        Remove:  am.SMerge(GroupPlaying, S{LogUserScrolled}),
    },
    UserBackStep: {
        Add:     S{BackStep},
        Require: S{ClientSelected},
        Remove:  am.SMerge(GroupPlaying, S{LogUserScrolled}),
    },

    ///// External state (eg UI)

    TreeFocused:          {Remove: GroupFocused},
    LogFocused:           {Remove: GroupFocused},
    SidebarFocused:       {Remove: GroupFocused},
    TimelineTxsFocused:   {Remove: GroupFocused},
    TimelineStepsFocused: {Remove: GroupFocused},
    MatrixFocused:        {Remove: GroupFocused},
    DialogFocused:        {Remove: GroupFocused},
    StateNameSelected:    {Require: S{ClientSelected}},
    HelpDialog:           {Remove: GroupDialog},
    ExportDialog: {
        Require: S{ClientSelected},
        Remove:  GroupDialog,
    },
    LogUserScrolled: {},
    Ready:           {Require: S{Start}},

    ///// Actions

    Start: {},
    TreeLogView: {
        Auto:   true,
        Remove: GroupViews,
    },
    MatrixView:     {Remove: GroupViews},
    TreeMatrixView: {Remove: GroupViews},
    TailMode: {
        Require: S{ClientSelected},
        Remove:  GroupPlaying,
    },
    Playing: {
        Require: S{ClientSelected},
        Remove:  am.SMerge(GroupPlaying, S{LogUserScrolled}),
    },
    Paused: {
        Auto:    true,
        Require: S{ClientSelected},
        Remove:  GroupPlaying,
    },

    // tx / steps back / fwd
    Fwd: {
        Require: S{ClientSelected},
        Remove:  S{Playing},
    },
    Back: {
        Require: S{ClientSelected},
        Remove:  S{Playing},
    },
    FwdStep: {
        Require: S{ClientSelected},
        Remove:  S{Playing},
    },
    BackStep: {
        Require: S{ClientSelected},
        Remove:  S{Playing},
    },

    ScrollToTx: {Require: S{ClientSelected}},

    // client
    SelectingClient: {Remove: S{ClientSelected}},
    ClientSelected: {
        Remove: S{SelectingClient, LogUserScrolled},
    },
    RemoveClient: {Require: S{ClientSelected}},
}

Comparison

Common differences with other state machines:

  • many states can be active at the same time
  • transitions between all the states are allowed
    • by default, unless constrained
  • states are connected by relations
  • every mutation can be rejected
  • error is a state

Usage

Basics

package main

import (
    "context"

    am "github.com/pancsta/asyncmachine-go/pkg/machine"
)

func main() {
    ctx := context.Background()
    mach := am.New(ctx, am.Struct{
        "ProcessingFile": {
            Add: am.S{"InProgress"},
            Remove: am.S{"FileProcessed"},
        },
        "FileProcessed": {
            Remove: am.S{"ProcessingFile", "InProgress"},
        },
        "InProgress": {},
    }, nil)
    mach.BindHandlers(&Handlers{
        Filename: "README.md",
    })
    // change the state
    mach.Add1("ProcessingFile", nil)
    // wait for completed
    select {
    case <-time.After(5 * time.Second):
        println("timeout")
    case <-mach.WhenErr(nil):
        println("err:", mach.Err)
    case <-mach.When1("FileProcessed", nil):
        println("done")
    }
}

type Handlers struct {
    Filename string
}

// negotiation handler
func (h *Handlers) ProcessingFileEnter(e *am.Event) bool {
    // read-only ops (decide if moving fwd is ok)
    // lock-free critical zone
    return true
}

// final handler
func (h *Handlers) ProcessingFileState(e *am.Event) {
    // read & write ops (but no blocking)
    // lock-free critical zone
    mach := e.Machine
    // tick-based context
    stateCtx := mach.NewStateCtx("ProcessingFile")
    go func() {
        // block in the background, locks needed
        if stateCtx.Err() != nil {
            return // expired
        }
        // blocking call
        err := processFile(h.Filename, stateCtx)
        if err != nil {
            mach.AddErr(err)
            return
        }
        // re-check the ctx after a blocking call
        if stateCtx.Err() != nil {
            return // expired
        }
        // move to the next state in the flow
        mach.Add1("FileProcessed", nil)
    }()
}

Waiting

// wait until FileDownloaded becomes active
<-mach.When1("FileDownloaded", nil)

// wait until FileDownloaded becomes inactive
<-mach.WhenNot1("DownloadingFile", args, nil)

// wait for EventConnected to be activated with an arg ID=123
<-mach.WhenArgs("EventConnected", am.A{"id": 123}, nil)

// wait for Foo to have a tick >= 6 and Bar tick >= 10
<-mach.WhenTime(am.S{"Foo", "Bar"}, am.T{6, 10}, nil)

// wait for DownloadingFile to have a tick >= 6
<-mach.WhenTickEq("DownloadingFile", 6, nil)

// wait for DownloadingFile to have a tick increased by 2 since now
<-mach.WhenTick("DownloadingFile", 2, nil)

See docs/cookbook.md for more snippets.

Buzzwords

AM gives you: states, events, thread-safety, logging, metrics, traces, debugger, history, flow constraints, scheduler

AM technically is: event emitter, queue, dependency graph, AOP, logical clocks, ~2.5k LoC, no deps

Flow constraints are: state mutations, negotiation, relations, "when" methods, state contexts, external contexts

Examples

States structure
var (
    states = am.Struct{
        // input states
        InputPush: {},
        InputCoin: {},

        // "state" states
        Locked: {
            Auto:   true,
            Remove: groupUnlocked,
        },
        Unlocked: {Remove: groupUnlocked},
    }
)
States structure
var (
    states = am.Struct{
        // input states
        Input: {Multi: true},

        // action states
        Start: {Add: am.S{StepX}},

        // "state" states
        StepX: {Remove: groupSteps},
        Step0: {Remove: groupSteps},
        Step1: {Remove: groupSteps},
        Step2: {Remove: groupSteps},
        Step3: {Remove: groupSteps},
    }
)
States structure
// States map defines relations and properties of states (for files).
var States = am.Struct{
    Init: {Add: S{Watching}},

    Watching: {
        Add:   S{Init},
        After: S{Init},
    },
    ChangeEvent: {
        Multi:   true,
        Require: S{Watching},
    },

    Refreshing: {
        Multi:  true,
        Remove: S{AllRefreshed},
    },
    Refreshed:    {Multi: true},
    AllRefreshed: {},
}

// StatesDir map defines relations and properties of states (for directories).
var StatesDir = am.Struct{
    Refreshing:   {Remove: groupRefreshed},
    Refreshed:    {Remove: groupRefreshed},
    DirDebounced: {Remove: groupRefreshed},
    DirCached:    {},
}
States structure
// States map defines relations and properties of states.
var States = am.Struct{
    CreatingExpense: {Remove: GroupExpense},
    ExpenseCreated:  {Remove: GroupExpense},
    WaitingForApproval: {
        Auto:   true,
        Remove: GroupApproval,
    },
    ApprovalGranted: {Remove: GroupApproval},
    PaymentInProgress: {
        Auto:   true,
        Remove: GroupPayment,
    },
    PaymentCompleted: {Remove: GroupPayment},
}
States structure
// States map defines relations and properties of states.
var States = am.Struct{
    DownloadingFile: {Remove: GroupFileDownloaded},
    FileDownloaded:  {Remove: GroupFileDownloaded},
    ProcessingFile: {
        Auto:    true,
        Require: S{FileDownloaded},
        Remove:  GroupFileProcessed,
    },
    FileProcessed: {Remove: GroupFileProcessed},
    UploadingFile: {
        Auto:    true,
        Require: S{FileProcessed},
        Remove:  GroupFileUploaded,
    },
    FileUploaded: {Remove: GroupFileUploaded},
}

Documentation

Tooling

Debugger

TUI Debugger

am-dbg is a multi-client debugger lightweight enough to be kept open in the background while receiving data from >100 machines simultaneously (and potentially many more). Some features include:

  • states tree
  • log view
  • time travel
  • transition steps
  • import / export
  • matrix view

See tools/cmd/am-dbg for more info.

Generator

am-gen will quickly bootstrap a typesafe states file for you.

$ am-gen states-file Foo,Bar

Example template for Foo and Bar
package states

import am "github.com/pancsta/asyncmachine-go/pkg/machine"

// S is a type alias for a list of state names.
type S = am.S

// States map defines relations and properties of states.
var States = am.Struct{
    Foo: {},
    Bar: {},
}

// Groups of mutually exclusive states.

//var (
//      GroupPlaying = S{Playing, Paused}
//)

//#region boilerplate defs

// Names of all the states (pkg enum).

const (
    Foo = "Foo"
    Bar = "Bar"
)

// Names is an ordered list of all the state names.
var Names = S{
    Foo,
    Bar,
    am.Exception,
}

//#endregion

See tools/cmd/am-gen for more info.

Integrations

Open Telemetry

Test duration chart

pkg/telemetry provides Open Telemetry integration which exposes machine's states and transitions as Otel traces, compatible with Jaeger.

See pkg/telemetry for more info or import a sample asset.

Prometheus

Test duration chart

pkg/telemetry/prometheus binds to machine's transactions and averages the values withing an interval window and exposes various metrics. Combined with Grafana, it can be used to monitor the metrics of you machines.

See pkg/telemetry/prometheus for more info.

Case Studies

Several case studies are available to show how to implement various types of machines, measure performance and produce a lot of inspectable data.

libp2p-pubsub benchmark

Test duration chart
  • pubsub host - eg ps-17 (20 states)
    PubSub machine is a simple event loop with Multi states which get responses via arg channels. Heavy use of Eval.
  • discovery - eg ps-17-disc (10 states)
    Discovery machine is a simple event loop with Multi states and a periodic refresh state.
  • discovery bootstrap - eg ps-17-disc-bf3 (5 states)
    BootstrapFlow is a non-linear flow for topic bootstrapping with some retry logic.

See github.com/pancsta/go-libp2p-pubsub-benchmark or the pdf results for more info.

libp2p-pubsub simulator

Simulator grafana dashboard
  • pubsub host eg ps-17 (20 states)
    PubSub machine is a simple event loop with Multi states which get responses via arg channels. Heavy use of Eval.
  • pubsub discovery - eg ps-17-disc (10 states)
    Discovery machine is a simple event loop with Multi states and a periodic refresh state.
  • simulator sim (14 states)
    Root simulator machine, initializes the network and manipulates it during heartbeats according to frequency definitions. Heavily dependent on state negotiation.
  • simulator's peer - eg sim-p17 (17 states)
    Handles peer's connections, topics and messages. This machine has a decent amount of relations. Each sim peer has its own pubsub host.
  • topics - eg sim-t-se7ev (5 states)
    State-only machine (no handlers, no goroutine). States represent correlations with peer machines.

See github.com/pancsta/go-libp2p-pubsub-benchmark for more info.

am-dbg

am-dbg is a tview TUI app with a single machine consisting of:

  • input events (7 states)
  • external state (11 states)
  • actions (14 states)

This machine features a decent amount of relations within a large number od states and 4 state groups. It's also a good example to see how easily an AM-based program can be controller with a script in tools/cmd/am-dbg-demo.

See tools/debugger/states for more info.

Roadmap

  • negotiation testers (eg CanAdd)
  • helpers for composing networks of machines
  • helpers for queue and history traversal
  • "state-trace" navbar in am-dbg (via AddFromEv)
  • go1.22 traces
  • inference
  • optimizations
  • manual updated to a spec

See also issues.

Changelog

Latest release: v0.3.1

  • feat: add version param #23 (@pancsta)
  • feat: complete TUI debugger iteration 3 #22 (@pancsta)
  • feat: TUI debugger iteration 2 #21 (@pancsta)
  • feat: add TUI debugger #20 (@pancsta)
  • feat: add telemetry via net/rpc #19 (@pancsta)
  • feat: add support for state groups for the Remove relation #17 (@pancsta)
  • fix: add more locks #16 (@pancsta)
  • feat: prevent empty remove mutations #15 (@pancsta)
  • feat: add VerifyStates for early state names assert #14 (@pancsta)
  • docs: add debugger readme img #13 (@pancsta)
  • docs: add ToC, cheatsheet #12 (@pancsta)
  • docs: align with the v0.2.0 release #11 (@pancsta)

See CHANELOG.md for the full list.