Skip to content

dgruber/wfl

Repository files navigation

☮ wfl - A Simple and Pluggable Workflow Language for Go ☮

Don't mix wfl with WFL.

CircleCI codecov

What's new?

  • Remote context for executing job workflows remotely
  • GPT support for experimenting with LLMs in job workflows (blog article, examples)
  • Check out my blog article, where I discuss leveraging the wfl library for ML/AI applications using Python and TensorFlow, among other tools.

Introduction

Creating process, container, pod, task, or job workflows based on raw interfaces of operating systems, Docker, Google Batch, Kubernetes, and HPC job schedulers can be a tedious. Lots of repeating code is required. All workload management systems have a different API.

wfl abstracts away from the underlying details of the processes, containers, and workload management systems. wfl provides a simple, unified interface which allows to quickly define and execute a job workflow and change between different execution backends without changing the workflow itself.

wfl is simple to use and designed to define and run jobs and self-contained job workflows with inter-job dependencies. There is no external controller runtime required. The whole job workflow can be contained in a single binary.

In its simplest form a process can be started and waited for:

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("convert", "image.jpg", "image.png").Wait()

If the output of the command needs to be displayed on the terminal you can set the out path in the default JobTemplate (see below) configuration:

 template := drmaa2interface.JobTemplate{
        ErrorPath:  "/dev/stderr",
        OutputPath: "/dev/stdout",
 }

 flow := wfl.NewWorkflow(wfl.NewProcessContextByCfg(wfl.ProcessConfig{
        DefaultTemplate: template,
 }))

 flow.Run("echo", "hello").Wait()

Running a job as a Docker container requires a different context (and the image already pulled before).

    import (
        "github.com/dgruber/drmaa2interface"
        "github.com/dgruber/wfl"
        "github.com/dgruber/wfl/pkg/context/docker"
    )
    
    ...
    ctx := docker.NewDockerContextByCfg(docker.Config{DefaultDockerImage: "busybox:latest"})
    wfl.NewWorkflow(ctx).Run("sleep", "60").Wait()

Starting a Docker container without a run command which exposes ports requires more configuration which can be provided by using a JobTemplate together with the RunT() method.

    jt := drmaa2interface.JobTemplate{
        JobCategory: "swaggerapi/swagger-editor",
    }
    jt.ExtensionList = map[string]string{"exposedPorts": "80:8080/tcp"}
    
    wfl.NewJob(wfl.NewWorkflow(docker.NewDockerContext())).RunT(jt).Wait()

Starting a Kubernetes batch job and waiting for its end is not much different.

    wfl.NewWorkflow(kubernetes.NewKubernetesContext()).Run("sleep", "60").Wait()

wfl also supports submitting jobs into HPC schedulers like SLURM, Grid Engine and so on.

    wfl.NewWorkflow(libdrmaa.NewLibDRMAAContext()).Run("sleep", "60").Wait()

wfl aims to work for any kind of workload. It works on a Mac and Raspberry Pi the same way as on a high-performance compute cluster.

There is basic support for getting the job output as a string back with the Output() method. It is a convenience wrapper which just reads the job output from a file which must be set before with OutputPath. Note that when having multiple tasks, they need to have different output paths set (hence use RunT(), or different flows, try the new "{{.ID}}" replacement in the OutputPath, or use wfl.RandomFileNameInTempDir() as OutputPath). Output() is currently implemented for the OS, Docker, and Kubernetes backend.

Some backend implementations (like for Kubernetes) support basic file transfer in the JobTemplate (when using RunT()) using the StageInFiles and StageOutFiles maps. On large scale you are missing checkpoint and restart functionality or HA of the workflow process itself. Here the idea is not to require any complicated runtime environment for the workflow applications rather keeping workflows small and repeatably executable from other workflows.

wfl works with simple primitives: context, workflow, job, and jobtemplate

First support for logging is also available. Log levels can be controlled by environment variables (export WFL_LOGLEVEL=DEBUG or INFO/WARNING/ERROR/NONE). Applications can use the same logging facility by getting the logger from the workflow (workflow.Logger()) or registering your own logger in a workflow (workflow.SetLogger(Logger interface)). Default is set to ERROR.

Getting Started

Dependencies of wfl (like drmaa2) are vendored in. The only external package required to be installed manually is the drmaa2interface.

    go get github.com/dgruber/drmaa2interface

Context

A context defines the execution backend for the workflow. Contexts can be easily created with the New functions which are defined in the context.go file or in the separate packages found in pkg/context.

For creating a context which executes the jobs of a workflow in operating system processes use:

    wfl.NewProcessContext()

If the workflow needs to be executed in containers the DockerContext can be used:

    docker.NewDockerContext()

If the Docker context needs to be configured with a default Docker image (when Run() is used or RunT() without a configured JobCategory (which is the Docker image)) then the ContextByCfg() can be called.

    docker.NewDockerContextByCfg(docker.Config{DefaultDockerImage: "busybox:latest"})

For running jobs either in VMs or in containers in Google Batch the GoogleBatchContext needs to be allocated:

    googlebatch.NewGoogleBatchContextByCfg(
        googlebatch.Config{
          DefaultJobCategory: googlebatch.JobCategoryScript, // default container image Run() is using or script if cmd runs as script
          GoogleProjectID:    "google-project",
          Region:             "europe-north1",
          DefaultTemplate: drmaa2interface.JobTemplate{
          MinSlots: 1, // for MPI set MinSlots = MaxSlots and > 1
          MaxSlots: 1, // for just a bunch of tasks MinSlots = 1 (parallelism) and MaxSlots = <tasks>
	},
    )

When you want to run the workflow as Cloud Foundry tasks the CloudFoundryContext can be used:

    cloudfoundry.NewCloudFoundryContext()

Without a config it uses following environment variables to access the Cloud Foundry cloud controller API:

For submitting Kubernetes batch jobs a Kubernetes context exists.

   ctx := kubernetes.NewKubernetesContext()

Note, that each job requires a container image specified which can be done by using the JobTemplate's JobCategory. When the same container image is used within the whole job workflow it makes sense to use the Kubernetes config otherwise you can use RunT() to specify a container image for a specific task.

   ctx := kubernetes.NewKubernetesContextByCfg(kubernetes.Config{DefaultImage: "busybox:latest"})

For working with HPC schedulers the libdrmaa context can be used. This context requires libdrmaa.so available in the library path at runtime. Grid Engine ships libdrmaa.so but the LD_LIBRARY_PATH needs to be typically set. For SLURM libdrmaa.so often needs to be build.

Since C go is used under the hood (drmaa2os which uses go drmaa) some compiler flags needs to be set during build time. Those flags depend on the workload manager used. Best check out the go drmaa project for finding the right flags.

For building SLURM requires:

    export CGO_LDFLAGS="-L$SLURM_DRMAA_ROOT/lib"
    export CGO_CFLAGS="-DSLURM -I$SLURM_DRMAA_ROOT/include"

If all set a libdrmaa context can be created by importing:

   ctx := libdrmaa.NewLibDRMAAContext()

The JobCategory is whatever the workload-manager associates with it. Typically it is a set of submission parameters. A basic example is here.

The Remote Context is used for sending jobs to a drmaa2os compatible job remote server backend. Such a remote server can be easily created by the drmaa2os remote jobtracker package or by using the OpenAPI specification. It allows to use any existing drmaa2 jobtracker to be accessible as a server. An example is executing Docker containers on a remote server. Another is sending jobs from a container running in Kubernetes to a sidecar.

A simple server example is here. Another is here.

    import(
   	    genclient "github.com/dgruber/drmaa2os/pkg/jobtracker/remote/client/generated"
        ...
    )

	params := &client.ClientTrackerParams{
		Server: "https://localhost:8088",
		Path:   "/jobserver/jobmanagement",
		Opts: []genclient.ClientOption{
			genclient.WithHTTPClient(httpsClient),
			genclient.WithRequestEditorFn(basicAuthProvider.Intercept),
		},
	}

	ctx := wfl.NewRemoteContext(wfl.RemoteConfig{}, params)

Workflow

A workflow encapsulates a set of jobs/tasks using the same backend (context). Depending on the execution backend it can be seen as a namespace.

It can be created by using:

    wf := wfl.NewWorkflow(ctx)

Errors during creation can be catched with

    wf := wfl.NewWorkflow(ctx).OnError(func(e error) {panic(e)})

or with

    if wf.HasError() {
        panic(wf.Error())
    }

Job

Jobs are the main objects in wfl. A job defines helper methods for dealing with the workload. Many of those methods return the job object itself to allow chaining calls in an easy way. Errors are stored internally and can be fetched with special methods. A job is as a container and control unit for tasks. Tasks are mapped in most cases to jobs of the underlying workload manager (like in Kubernetes, HPC schedulers etc.) or raw processes or containers.

The Run() method submits a new task and returns immediately, i.e. not waiting for the job to be started or finished. When the Run() method errors the job submission has failed. The Wait() method waits until the task has been finished. If multiple Run() methods are called in a chain, multiple tasks might be executed in parallel (depending on the backend). When the same task should be executed multiple times the RunArray() method might be convenient. When using a HPC workload manager using the LibDRMAA implementation it gets translated to an array job, which is used for submitting and running 10s of thousands of tasks in an HPC clusters (like for bioinformatics or for electronic design automation workloads). Each task gets an unique task number set as environment variable. This is used for accessing specific data sets.

The method RunMatrixT() allows to submit and run multiple tasks based on a job template with placeholders. Those placeholders get replaced with defined values before jobs get submitted. That allows to submit many tasks using different job templates in a convenient way (like for executing a range of commands in a set of different container images for testing).

In some systems it is required to delete job related resources after the job is finished and no more information needs to be queried about its execution. This functionality is implemented in the DRMAA2 Reap() method which can be executed by ReapAll() for each task in the job object. Afterwards the job object should not be used anymore as some information might not be available anymore. In a Kubernetes environment it removes the job objects and potentially related objects like configmaps.

Methods can be classified in blocking, non-blocking, job template based, function based, and error handlers.

Job Submission

Function Name Purpose Blocking Examples
Run() Starts a process, container, or submits a task and comes back immediately no
RunT() Like above but with a JobTemplate as parameter no
RunArray() Submits a bulk job which runs many iterations of the same command no
Resubmit() Submits a job n-times (Run().Run().Run()...) no
RunEvery() Submits a task every d time.Duration yes
RunEveryT() Like RunEvery() but with JobTemplate as param yes
RunMatrixT() Replaces placeholders in the job template and submits combinations no

Job Control

Function Name Purpose Blocking Examples
Suspend() Stops a task from execution (e.g. sending SIGTSTP to the process group)...
Resume() Continues a task (e.g. sending SIGCONT)...
Kill() Stops process (SIGKILL), container, task, job immediately.

Function Execution

Function Name Purpose Blocking Examples
Do() Executes a Go function yes
Then() Waits for end of process and executes a Go function yes
OnSuccess() Executes a function if the task run successfully (exit code 0) yes
OnFailure() Executes a function if the task failed (exit code != 0) yes
OnError() Executes a function if the task could not be created yes
ForEach(f, interface{}) Executes a user defined function by iterating over all tasks does not wait for jobs
ForAll(f, interface{}) Executes a user defined function concurrently in goroutines on all tasks no

Blocker

Function Name Purpose Blocking Examples
After() Blocks a specific amount of time and continues yes
Wait() Waits until the task submitted latest finished yes
Synchronize() Waits until all submitted tasks finished yes
Output() Waits until the last submitted task is finished and returns the output as string yes Only for process, Docker, and K8s currently.

Job Flow Control

Function Name Purpose Blocking Examples
ThenRun() Wait() (last task finished) followed by an async Run() partially
ThenRunT() ThenRun() with template partially
OnSuccessRun() Wait() if Success() then Run() partially  
OnSuccessRunT() OnSuccessRun() but with template as param partially
OnFailureRun() Wait() if Failed() then Run() partially  
OnFailureRunT() OnFailureRun() but with template as param partially
Retry() wait() + !success() + resubmit() + wait() + !success() yes  
AnyFailed() Cchecks if one of the tasks in the job failed yes  

Job Status and General Checks

Function Name Purpose Blocking Examples
JobID() Returns the ID of the submitted job no  
JobInfo() Returns the DRMAA2 JobInfo of the job no  
Template() no  
State() no  
LastError() no  
Failed() no  
Success() no  
ExitStatus() no  
ReapAll() Cleans up all job related resources from the workload manager. Do not
use the job object afterwards. Calls DRMAA2 Reap() on all tasks. no  
ListAllFailed() Waits for all tasks and returns the failed tasks as DRMAA2 jobs yes
ListAll() Returns all tasks as a slice of DRMAA2 jobs no

LLM (GPT) Enhancements

For using the LLM methods the workflow needs to be initialized with an LLM config. For this the WithLLMOpenAI() method is used.

	flow := wfl.NewWorkflow(wfl.NewProcessContext()).WithLLMOpenAI(
		wfl.OpenAIConfig{
			Token: os.Getenv("OPENAI_KEY"),
		}).OnErrorPanic()

Then the flow offers the TemplateP("what should the script do?") method which can create job templates (flow.TemplateP()) and following Job methods can be use:

Function Name Purpose Blocking Examples
OutputP() Returns the output of the job on which the given prompt is applied yes OutputP("Summarize in 2-3 sentences.")
ErrorP() Takes the submission error message and applies a prompt no ErrorP("Explain the error and provide a solution")

JobTemplate

JobTemplates are specifying the details about a job. In the simplest case the job is specified by the application name and its arguments like it is typically done in the OS shell. In that case the Run() methods (ThenRun(), OnSuccessRun(), OnFailureRun()) can be used. Job template based methods (like RunT()) can be completely avoided by providing a default template when creating the context (...ByConfig()). Then each Run() inherits the settings (like JobCategory for the container image name and OutputPath for redirecting output to stdout). If more details for specifying the jobs are required the RunT() methods needs to be used. I'm using currently the DRMAA2 Go JobTemplate. In most cases only RemoteCommand, Args, WorkingDirectory, JobCategory, JobEnvironment, StageInFiles are evaluated. Functionality and semantic is up to the underlying drmaa2os job tracker.

The Template object provides helper functions for job templates. For an example see here.

Examples

For examples please have a look into the examples directory. template is a canonical example of a pre-processing job, followed by parallel execution, followed by a post-processing job.

test is an use case for testing. It compiles all examples with the local go compiler and then within a Docker container using the golang:latest image and reports errors.

cloudfoundry demonstrates how a Cloud Foundry tasks can be created.

Creating a Workflow which is Executed as OS Processes

The allocated context defines which workload management system / job execution backend is used.

    ctx := wfl.NewProcessContext()

Different contexts can be used within a single program. That way multi-clustering potentially over different cloud solutions is supported.

Using a context a workflow can be established.

    wfl.NewWorkflow(wfl.NewProcessContext())

Handling an error during workflow generation can be done by specifying a function which is only called in the case of an error.

    wfl.NewWorkflow(wfl.NewProcessContext()).OnError(func(e error) {
  panic(e)
 })

The workflow is used in order to instantiate the first job using the Run() method.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "123")

But you can also create an initial job like that:

    job := wfl.NewJob(wfl.NewWorkflow(wfl.NewProcessContext()))

For more detailed settings (like resource limits) the DRMAA2 job template can be used as parameter for RunT().

Jobs allow the execution of workload as well as expressing dependencies.

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").ThenRun("sleep", "1").Wait()

The line above executes two OS processes sequentially and waits until the last job in chain is finished.

In the following example the two sleep processes are executed in parallel. Wait() only waits for the sleep 1 job. Hence sleep 2 still runs after the wait call comes back!

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Wait()

Running two jobs in parallel and waiting until all jobs finished can be done with Synchronize().

    wfl.NewWorkflow(wfl.NewProcessContext()).Run("sleep", "2").Run("sleep", "1").Synchronize()

Jobs can also be suspended (stopped) and resumed (continued) - if supported by the execution backend (like OS, Docker).

    wf.Run("sleep", "1").After(time.Millisecond * 100).Suspend().After(time.Millisecond * 100).Resume().Wait()

The exit status is available as well. ExitStatus() blocks until the previously submitted job is finished.

    wfl.NewWorkflow(ctx).Run("echo", "hello").ExitStatus()

In order to run jobs depending on the exit status the OnFailure and OnSuccess methods can be used:

    wf.Run("false").OnFailureRun("true").OnSuccessRun("false")

For executing a function on a submission error OnError() can be used.

For running multiple jobs on a similar job template (like for test workflows) the RunMatrixT() can be used. It expects a JobTemplate with self-defined placeholders (can be any string). Those placeholders are getting replaced by the lists specified in the Replacements structs. Then any combination of the replacement lists are evaluated and new job templates are generated and submitted.

The following example submits and waits for 4 tasks:

  • sleep 0.1
  • echo 0.1
  • sleep 0.2
  • echo 0.2

If only a list of replacements is required then the second replacement can just left empty (wfl.Replacement{}). For JobTemplate fields with numbers the replacement strings are automatically converted to numbers.

job := flow.NewJob().RunMatrixT(
    drmaa2interface.JobTemplate{
     RemoteCommand: "{{cmd}}",
     Args:          []string{"{{arg}}"},
    },
    wfl.Replacement{
     Fields:       []wfl.JobTemplateField{{wfl.RemoteCommand},

     Pattern:      "{{cmd}}",
     Replacements: []string{"sleep", "echo"},
    },
    wfl.Replacement{
     Fields:       []wfl.JobTemplateField{{wfl.Args},

     Pattern:      "{{arg}}",
     Replacements: []string{"0.1", "0.2"},
    },
   )
job.Synchronize()

More methods can be found in the sources.

Basic Workflow Patterns

Sequence

The successor task runs after the completion of the predecessor task.

    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").ThenRun("echo", "second task")
    ...

or

    flow := wfl.NewWorkflow(ctx)
    job := flow.Run("echo", "first task")
    job.Wait()
    job.Run("echo", "second task")
    ...

Parallel Split

After completion of a task run multiple branches of tasks.

    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").Wait()

    notifier := wfl.NewNotifier()

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchA").
            Run("sleep", "1").
            ThenRun("sleep", "3").
            Synchronize().
            Notify(notifier)
    }

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchB").
            Run("sleep", "1").
            ThenRun("sleep", "3").
            Synchronize().
            Notify(notifier)
    }

    notifier.ReceiveJob()
    notifier.ReceiveJob()

    ...

Synchronization of Tasks

Wait until all tasks of a job which are running in parallel are finished.

    flow := wfl.NewWorkflow(ctx)
    flow.Run("echo", "first task").
        Run("echo", "second task").
        Run("echo", "third task").
        Synchronize()

Synchronization of Branches

Wait until all branches of a workflow are finished.

    notifier := wfl.NewNotifier()

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchA").
            Run("sleep", "1").
            Wait().
   Notify(notifier)
    }

    go func() {
        wfl.NewJob(wfl.NewWorkflow(ctx)).
            TagWith("BranchB").
            Run("sleep", "1").
            Wait().
   Notify(notifier)
    }

    notifier.ReceiveJob()
    notifier.ReceiveJob()

    ...

Exclusive Choice

    flow := wfl.NewWorkflow(ctx)
    job := flow.Run("echo", "first task")
    job.Wait()

    if job.Success() {
        // do something
    } else {
        // do something else
    }
    ...

Fork Pattern

When a task is finished n tasks needs to be started in parallel.

    job := wfl.NewWorkflow(ctx).Run("echo", "first task").
        ThenRun("echo", "parallel task 1").
        Run("echo", "parallel task 2").
        Run("echo", "parallel task 3")
    ...

or

    flow := wfl.NewWorkflow(ctx)
    
    job := flow.Run("echo", "first task")
    job.Wait()
    for i := 1; i <= 3; i++ {
        job.Run("echo", fmt.Sprintf("parallel task %d", i))
    }
    ...

For missing functionality or bugs please open an issue on github. Contributions welcome!