Skip to content

pfnet-research/pftaskqueue

Repository files navigation

pftaskqueue: Lightweight task queue tool. Feel free to process embarrassingly-parallel tasks at scale.

CI

              +-------------------------+
+----+          +----+ +----+    +----+
|User+-----+--> |Task+-+Task|....|Task| +------+---------------------------------------+
+----+     |    +----+ +----+    +----+        |                                       |
           |  +-------------------------+      |                                       |
           |                                   |                                       |
           |                                   |                                       |
           |     +-----------------------------v-+       +-----------------------------v-+
           |     | pftaskqueue worker            |       | pftaskqueue worker            |
           |     |    +-----------------------+  |       |    +-----------------------+  |
           |     |    |-----+                 |  |       |    |-----+                 |  |
           |     |    ||Task| handler process |  |       |    ||Task| handler process |  |
           |     |    |-----+                 |  |       |    |-----+                 |  |
 Automatic |     |    +-----------------------+  | ....  |    +-----------------------+  |
     Retry |     |      ︙                       |       |       ︙                      |
           |     |    +-----------------------+  |       |    +-----------------------+  |
           |     |    |-----+                 |  |       |    |-----+                 |  |
           |     |    ||Task| handler process |  |       |    ||Task| handler process |  |
           |     |    |-----+                 |  |       |    |-----+                 |  |
           |     |    |-----------------------+  |       |    |-----------------------+  |
           |     +-+-----------------------------+       +--+----------------------------+
           |       |                                        |
           +-------+----------------------------------------+

Feature

  • Handy: No dependency. Single binary.
  • Simple: pftaskqueue fits to process embarrassingly parallel tasks in distributed manner. It can process bunch of task configurations with one single task handler commands for simplicity.
  • Scalable: multiple pftaskqueue workers can consume single queue.
  • Reliable: pftaskqueue never lost tasks even when worker exited abnormally.
  • Automatic Retry: pftaskqueue can retry (re-queue) tasks with respect to its retry limit configuration if tasks failed.
  • Task Chaining: pftaskqueue accepts some extra succeeding tasks (called postHooks) from task handler on their success/failure. pftaskqueue will queue them automatically. This means your task handler can chain succeeding tasks programmatically.
  • Automatic Worker Salvation: pftaskqueue detects some lost workers (with timeout basis) and resurrects the tasks (re-queue) on th workers.
  • Suspend/Resume: pftaskqueue can suspend/resume queue. Once you suspend a queue, worker will stop pulling new tasks (you can configure to keep running too). Once you resume a queue, worker will start pulling.
  • Backend Independent: pftaskqueue supports multiple queue backend (to plan. now only supports redis).

Installation

Download released binary

Just download a tar ball for your platform from releases page, decompress the archive and place a binary to your PATH.

Docker image

pftaskqueue publishes two kind of docker iamges in GitHub Packages

  • ghcr.io/pfnet-research/pftaskqueue/dev:latest: built on the latest master branch
    • ghcr.io/pfnet-research/pftaskqueue/dev:vx.y.z-alpha-{{revision}} is also available for old dev images
  • ghcr.io/pfnet-research/pftaskqueue/release:latest: the latest release image
    • ghcr.io/pfnet-research/pftaskqueue/release:vx.y.z is also available for previous/specific release images

To pull those images, you would need docker login docker.pkg.github.com first. Please see Configuring Docker for use with GitHub Packages if you are not familiar wit it.

$ docker pull docker.pkg.github.com/pfnet-research/pftaskqueue/release:latest

To build docker image by yourself:

docker build . -t YOUR_TAG

Build from the source

# clone this repository
$ make

# copy pftaskqueue binary to anywhere you want
$ cp ./dist/pftaskqueue /anywhere/you/want

Getting Started

  1. Prepare Redis for sample backend (on default listen address localhost:6379)
# docker
$ docker run --name redis -d -p 6379:6379 redis redis-server --appendonly yes

# local on Mac (Homebrew)
$ brew install redis
$ redis-server
  1. Please setup pftaksqueue configfile and set your redis key prefix
$ pftaskqueue print-default-config > ~/.pftaskqueue.yaml

If you shared the same redis DB among others, you must avoid key collisions. As a safeguard, pftaskqueue redis backend must set key prefix. Please see Managing Configurations section how to configure the option. Let us use environment variable here.

export PFTQ_REDIS_KEYPREFIX=${___YOUR___USERNAME___}
  1. Create a task queue and push your tasks
# create a queue
$ pftaskqueue create-queue foo
4:44PM INF Queue created successfully queueName=foo queueState=active queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67

# add tasks to the queue ('-f -' means read from stdin)
$ cat << EOT | pftaskqueue add-task foo -f -
payload: payload
retryLimit: 0
timeoutSeconds: 100
EOT
4:45PM INF Task added to queue pipe=stdin queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 task={"spec":{"payload":"payload","timeoutSeconds":100},"status":{"failureCount":0,"phase":"Pending","salvageCount":0},"uid":"12333130-0b42-4724-b6d2-3a1c36ed3561"}
  1. Start pftaskqueue worker
# start worker with task handler commands (you can stop the worker with Ctrl-C) 
# see 'Task Handler Specification' section for details.
$ pftaskqueue start-worker --name=$(hostname) --queue-name foo -- cat
4:45PM INF Start processing a task component=worker processUID=7cca2a80-155f-4527-ae61-02a8c3c03110 queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 taskHandlerCommands=["cat"] taskSpec={"payload":"payload","timeoutSeconds":100} taskUID=12333130-0b42-4724-b6d2-3a1c36ed3561 workerName=everpeace-macbookpro-2018.local workerUID=06c333f8-aeab-4a3d-8624-064a305c53ff
4:45PM INF "/tmp/06c333f8-aeab-4a3d-8624-064a305c53ff/7cca2a80-155f-4527-ae61-02a8c3c03110" pipe=stdout processUID=7cca2a80-155f-4527-ae61-02a8c3c03110 taskUID=12333130-0b42-4724-b6d2-3a1c36ed3561
4:45PM INF Task marked Succeeded component=worker processUID=7cca2a80-155f-4527-ae61-02a8c3c03110 queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 taskResult={"reason":"Succeeded","type":"Success"} taskSpec={"payload":"payload","timeoutSeconds":100} taskUID=12333130-0b42-4724-b6d2-3a1c36ed3561 workerName=everpeace-macbookpro-2018.local workerUID=06c333f8-aeab-4a3d-8624-064a305c53ff
^C4:45PM INF Signal received. Stopping all the on-going tasks signal=interrupt
4:45PM INF Completed processing configured number of tasks component=worker numTasks=100 queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 workerName=everpeace-macbookpro-2018.local workerUID=06c333f8-aeab-4a3d-8624-064a305c53ff
4:45PM INF Waiting for all the ongoing tasks finished component=worker queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 workerName=everpeace-macbookpro-2018.local workerUID=06c333f8-aeab-4a3d-8624-064a305c53ff
4:45PM INF Worker stopped component=worker queueName=foo queueUID=8aae8e25-998d-44e9-8ce8-7792c70e9c67 workerName=everpeace-macbookpro-2018.local workerUID=06c333f8-aeab-4a3d-8624-064a305c53ff

Please use help for detailed tool usage.

pftaskqueue help

Example: pftaskqueue in Kubernetes cluster

pftaskqueue can use in kubernetes. It recommends the below structure:

  • ConfigMap: holds pfatskqueue configuration except for password.
    • You should NOT include redis password in the configmap. Use Secret instead.
  • Job: runs pftaskqueue workers .

Assume you have created some queue and the queue is active.

  1. create Secret which holds redis secret

    $ kubectl create secret generic redis-password --from-literal=password =.......
  2. creates ConfigMap which holds pftaskqueue configuration

    $ pftaskqueue print-default-config > config.yaml
    
    # Edit the file 
    
    $ kubectl create configmap pftaskqueue-config --from-file=pftaskqueue.config=config.yaml
  3. create worker Job

    # job.yaml
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: redis-job
    spec:
      # This will run 4 pods.  It will processes tasks with 4 * worker.concurrency concurrent task handlers. 
      parallelism: 4
      # The Job will recreate until 4 Pods succeeded
      completions: 4
      # The Job will give up retrying to create Pods with the limit 
      backoffLimit: 4
      template:
        spec:
          restartPolicy: Never
          containers:
            - name: ctr
              image: <your_container_which_includes_pftaskqueue>
              args:
                - start-worker
                - --config=/pftaskqueue-config/pftaskqueue.yaml
                - --exit-on-empty=true
                - --exit-on-suspend=true
              env:
                  # You can override configurations by envvars
                - name: PFTQ_WORKER_QUEUENAME
                  value: test
                  # pod name will be suitable for worker name
                - name: PFTQ_WORKER_NAME
                  valueFrom:
                    fieldRef:
                      fieldPath: metadata.name
                  # inject rerdis password 
                - name: PFTQ_REDIS_PASSWORD
                  valueFrom:
                    secretKeyRef:
                      name: redis-password
                      key: password
              resources:
                limits:
                  cpu: "1000m"
                  memory: "256Mi"
              # configmap as volume
              volumeMounts:
                - mountPath: /pftaskqueue-config
                  name: pftaskqueue-config
          volumes:
            - name: pftaskqueue-config
              configMap:
                name: pftaskqueue-config
    $ kubectl apply -f job.yaml

Concepts

Queue

see queue.go for full data structure.

    +--------+       +---------+
    | Active <-------> Suspend |
    +--------+       +---------+

 *Queue can be created in each state

pftaskqueue's queue data model is very simple. Each queue only has its name and state. Queues are distinguished by its names. However, each queue has UID internally two distinguish two queues with different lifecycle but the same name. State is one of active and suspend. In active state, pftaskqueue worker can consume tasks from the queue. Otherwise (suspend state), pftaskqueue worker won't consume tasks.

Every Task belongs to only single queue. Once queue was deleted, all the information will be erased.

Queue operations

Below queue operation are supported.

$ pftaskqueue create-queue [queue] --state=[active|suspend]
$ pftaskqueue get-queue-state [queue] 
$ pftaskqueue suspend-queue [queue]
$ pftaskqueue resume-queue [queue] 
$ pftaskqueue delete-queue [queue]
$ pftaskqueue get-all-queues [queue] --output=[yaml,json,table]

Please remember delete-queue removes all the information from the backend.

Tasks

see task.go for full data structure.

TaskSpec

You can specify any task payload in TaskSpec. TaskSpec can convey any payload as blow. Once you added TaskSpec to queue, pftaskqueue assigns UID for it automatically. pftasqueue distinguishes tasks by the UIDs. See the next section for UID. Thus, if you queued multiple identical task specs, pftaskqueue recognizes them different tasks.

# name is just a display name.  the field is NOT for identifying tasks.
# Tasks are identified by UID described in the next section.
# Max length is 1024.
name: "this is just an display name" 
# payload is the conveyor of task parameters.
# Max byte size varies on backend type to prevent from overloading backend.
#  redis: 1KB
payload: |
  You can define any task information in payload
# retryLimit is the maximum number of retry (negative number means infinite)
# NOTE: only the limited number of recent task records will be recorded in its status.
#       so, if you set large value or infinite here, you will loose old task records.
#       please see the description of status.history field in the next section.
retryLimit: 3
# timeoutSeconds is for task handler timeout.
# If set positive value, task handler timeout for processing this task.
# Otherwise, worker's default timeout will be applied. (See 'Task Queue Worker' section)
timeoutSeconds: 600

Task

Task holds various metadata for tracking task lifecycle. See inline comments below.

# UID generated pftaskqueue for posted taskspec
uid: c7062138-fc38-4988-9f94-6e377d9855c3
# Posted TaskSpec
spec:
  name: myname
  payload: payload
  rertryLimit: 3
  timeoutSeconds: 100
status:
  # Phase of the task.
  # See below section for task lifecycle
  phase: Processing
  createdAt: 2020-02-12T20:20:29.350631+09:00
  # Failure count of the task
  failureCount: 1
  # Count the task was salvaged
  # Please see "Salvaging lost worker" section below for datails
  salvagedCount: 0
  # current processing task handler record
  currentRecord:
    processUID: 7b7b39f5-da66-4380-8002-033dff0e0f26
    # worker name received the task 
    workerName: everpeace-macbookpro-2018.local
    # This value is unique among pftaskqueue worker processes
    workerUID: 15bfe424-889a-49ca-88d7-fb0fc51f68d
    # timestamps
    receivedAt: 2020-02-12T20:20:39.350631+09:00
    startedAt: 2020-02-12T20:20:39.351479+09:00
  # history of recent records of processing the task.
  # the limited number of recent records are recorded in this field.
  # the value varies on backend types to prvent overloading backends:
  # - redis: 10 entries
  # NOTE: so, if you set larger value than this limit in spec.rertryLimit,
  #       you will loose old task records.
  history:
    # TaskRecord:
    #   this represents a record of task handler invocation
    #   in a specific worker.
    # UID of process(generated by pftaskqueue)
  - processUID: 194b8ad9-543b-4d01-a571-f8d1db2e74e6
    # worker name & UID which received the task
    workerName: everpeace-macbookpro-2018.local
    workerUID: 06c333f8-aeab-4a3d-8624-064a305c53ff
    # timestamps
    receivedAt: 2020-02-12T20:18:39.350631+09:00
    startedAt: 2020-02-12T20:18:39.351479+09:00
    finishedAt: 2020-02-12T20:18:39.365945+09:00
    # TaskResult:
    #   this represents a result of single task handler invocation
    result:
      # type is one of Success, Failure
      type: Success
      # reason is one of:
      #  - Success: success case
      #  - Failed: task handler failed
      #  - Timeout: task handler timeout (defined in spec)
      #  - Signaled: pftaskqueue worker signaled and the task handler was interrupted
      #  - InternalError: pftaskqueue worker faced some error processing a task
      reason: Succeeded
      # Returned values from the task handlers.
      # See 'Task Handler Specification' section for how pftaskqueue worker communicates
      # with its task handler processes.
      # payload max size varies on backend type to prevent from overloading backend.
      #  redis: 1KB
      # If size exceeded, the contents will be truncated automatically
      payload: ""
      # message max size varies on backend type to prevent from overloading backend.
      #  redis: 1KB
      # If size exceeded, the contents will be truncated automatically
      message: ""
      # Below two fields will be set if the worker which processes the task was
      # lost and salvaged by the other worker.
      # See "Worker lifecycle" section below for details.
      # salvagedBy: <workerUID>
      # salvagedAt: <timestamp>

Task lifecycle

+---------+    +----------+    +------------+      +-----------+
| Pending +--->+ Received +--->+ Processing +----->+ Succeeded |
+----^----+    +-------+--+    +-----+------+      +-----------+
     |                 |             |
     |                 |             |
     | Retry           |Retry        |Retry
     | Remaining       |Exceeded     |Exceeded     +--------+
     +-----------------+-------------+------------>+ Failed |
                                                   +--------+

If you queued your TaskSpec, pftaskqueue assign UID to it and generate Task with Pending phase for it. Some worker pulled a Pending task from the queue, Task transits to Received phase. When Task actually stared to be processed by task handler process, it transits to Processing phase.

Once task handler process succeeded, Task transits to Succeeded phase. If task handler process failed, pftaskqueue can handle automatic retry feature with respect to TaskSpec.retryLimit. If the task handler process failed and it didn't reach at its retry limit, pftaskqueue re-queue the task with setting Pending phase again. Otherwise pftaskqueue will give up retry and mark it Failed phase. You can see all the process record of the Task status.

If worker was signaled, tasks in Received or Processing phase will be treated as failure and pftaskqueue will handle automatic retry feature.

$ pftaskqueue get-task [queue] --state=failed -o yaml
...
- uid: c5eff8d1-93f0-480b-81ed-89b50e89720c
  spec:
    name: myname
    payload: foo
    retryLimit: 1
    timeout: 10m
  status:
    phase: Failed
    createdAt: 2020-02-12T14:34:13.302071+09:00
    failureCount: 2
    history:
    - processUID: b8e9d028-0413-497a-b7df-27ab40b20a6b
      workerName: some.worker.name.you.set
      receivedAt: 2020-02-12T14:34:23.302071+09:00
      startedAt: 2020-02-12T14:34:23.302741+09:00
      finishedAt: 2020-02-12T14:34:23.303892+09:00
      result:
        type: Failure
        reason: Failed
        message: "..."
        payload: "..."
    - processUID: 417022f3-5dce-4125-afd1-47ed2480bfd8
      workerName: some.worker.name.you.set
      receivedAt: 2020-02-12T14:34:23.291144+09:00
      startedAt: 2020-02-12T14:34:23.291869+09:00
      finishedAt: 2020-02-12T14:34:23.301319+09:00
      result:
        type: Failure
        reason: Failed
        message: "..."
        payload: "..."

Task operations

Below operations are supported:

# this read taskspec in yaml format from stdin
$ pftaskqueue add-task [queue]

# this prints entries in yaml format
$ pftaskqueue get-task [queue] --state=[all,pending,completed,succeeded,failed,workerlost,deadletter] --output=[yaml,json,table]

# To be implemented
# $ pftaskqueue salvage-workerlost-tasks [queue]

See also Dead Letter section.

Workers

pftaskqueue worker is a queue consumer process. This keeps fetching Pending tasks and spawns task handler processes up to configured concurrency and manage task lifecycle and its metadata. You can run multiple workers across multiple servers.

pftaskqueue worker can start with

pftaskqueue start-worker -- [task handler commands]

Worker Configuration

You can configure worker behaviour by these parameters below. Please note that these parameter can be configured by cli flags, environment variables or config files. See Managing Configurations section below.

# .pftaskqueue.yaml
...
worker:
  # Queue name to consume
  queueName: ""
  # Worker name
  #   This is used just to stamp into TaskRecord
  #   worker UID will be generated in each run
  name: ""
  # Concurrency of task handler processes
  concurrency: 1
  # TaskHandler configuration
  taskHandler:
    # Default timeout of the task handler.
    # This value will be used when TaskSpec.timeoutSeconds is not set or 0.
    defaultTimeout: 30m0s
    # Cleanup workspace dir or not when each task handler execution finished.
    cleanupWorkspaceDir: false
    # Task Handler Command
    # A Worker spawns a process with the command for each received tasks
    commands:
    - cat
  # Worker heartbeat configuration to detect worker process existence
  # Please see "Worker lifecycle" section
  heartBeat:
    # A Worker process tries to update its Worker.Status.lastHeartBeatAt field
    # stored in queue backend in this interval
    interval: 2s
    # A Worker.Status.lastHeartBeatAt will be determined "expired"
    # when lastHeartBeatAt timestamp was not updated in this duration
    # Workers with expired heartbeat will be defined as "lost" state. (and exited)
    expirationDuration: 10s
    # The duration after heart beat expiration to make the worker target for salvation
    salvageDuration: 15s
  # If true, worker exits when the queue was suspended
  exitOnSuspend: true
  # If true, worker exits when the queue was empty
  exitOnEmpty: false
  # If exitOnEmpty is true, worker waits for exit in the grace period
  exitOnEmptyGracePeriod: 10s
  # If the value was positive, worker will exit
  # after processing the number of tasks
  numTasks: 1000
  # Base directory to create workspace for task handler processes
  workDir: /tmp
  # Worker normally to perform worker salvation on startup
  # this can limit the number of workers to salvage. -1 means all
  numWorkerSalvageOnStartup: -1

Worker status

Worker stores its information in the queue and you can query worker state. See inline comments below:

# The uid of the worker
# This will be assigned by pftaskqueue on the startup
uid: 06c333f8-aeab-4a3d-8624-064a305c53ff
# The queue UID which the worker can work on
queueUID: 8aae8e25-998d-44e9-8ce8-7792c70e9c67
spec:
...This is parameters described by the above section...
status:
  # Worker phase: one of Runinng, Succeeded or Failed
  # See the next section for Worker lifecycle
  phase: Succeeded
  # one of Success, Failure, Lost or Salvaged
  reason: Success
  # timestamps
  startedAt: 2020-02-17T16:45:43.979554+09:00
  finishedAt: 2020-02-17T16:45:46.534106+09:00
  # Last heartbeated timestamp
  # This is used to deternmine the worker process existence
  lastHeartBeatAt: 2020-02-17T16:45:46.534106+09:00
  # Below two fields will be set when the worker
  # was salvaged by other worker
  # salvagedBy: <workerUID>
  # salvagedAt: <timestamp>

Worker lifecycle

                       +-----------+
          +------------> Succeeded |
          |            +-----------+
     +----+----+       +--------+
+----> Running +-------> Failed |
     +----+----+       +--------+
          |            +--------+
          +------------> Failed |     Other worker salvages
    HeartBeat expired  | (Lost) |     after salvageDurationtion
                       +----+---+     from its expiration
                            |           +------------+
                            +----------->   Failed   |
                                        | (Salvaged) |
                                        +------------+

Once worker started, it starts with Running phase. In the startup, a worker register self to the queue and get its UID. The UID becomes the identifier of workers. If worker exited normally (with exit-code=0), it transits Succeeded phase. If exit-code was not 0, it transits to Failed phase.

However, worker process was go away by various reasons (SIGKILL-ed, OOMKiller, etc.). Then, how to detect those worker's sate? pftaskqueue applies simple timeout based heuristics. A worker process keeps sending heartbeat during it runs, with configured interval, to the queue by updating its Status.lastHeartBeatAt field. If the heartbeat became older then configured expiration duration, the worker was determined as 'Lost' state (phase=Failed, reason=Lost). Moreover when a worker detects their own heartbeat expired, they exited by their selves to wait they will be salvaged by other workers.

On every worker startup, a worker tries to find Lost workers which are safe to be salvaged. pftaskqueue also used simple timeout-based heuristics in salvation, too. If time passed Worker.HeartBeat.SalvagedDuration after its heartbeat expiration, the worker is determined as a salvation target. Once the worker finds some salvation target workers, it will salvage the worker. "Salvation" means

  • marks the target Salvaged phase (phase=Failed, reason=Salvaged)
  • re-queues all the non-Completed tasks to the pending queue which are handled in the target
    • it also stamp salvagedAt, salvagedBy fields in each TaskRecord

Worker operations

Most worker management are performed inside of pfatakqueue. The allowed operation is seeing workers:

pftaskqueue get-worker [queue] --state=[all,running,succeeded,failed,lost,tosalvage] --output=[yaml,json,table]

Task handler specification

pftaskqueue forks a subprocess to execute task handler commands for each received task.

Success/Failure

pftaskqueue recognize success/failure of task handler invocations by its exit code.

  • 0: success
  • otherwise: failure

Input/Output

pftaskqueue communicates with task handler process via files. pftaskqueue passes its workspace directory to stdin of task handler process. It also defines PFTQ_TASK_HANDLER_WORKSPACE_DIR environment variable for each task handler process. The directory structure as below. All the inputs are also exported as environment variables whose names are prefixed with PFTQ_TASK_HANDLER_INPUT_.

┌ {workspace direcoty}             # pftaskqueue passes the dir name to stdin of task handler process
│                                  # also exported as PFTQ_TASK_HANDLER_WORKSPACE_DIR
│                                  # Note: this directory will be deleted after task handler finished
│                                  #       when taskHandler.CleanupWorkspaceDir is true in worker configuration
│
│   # pftaskqueue prepares whole the contents
├── input
│   ├── payload                  # TaskSpec.payload in text format 
│   │                            # also exported as PFTQ_TASK_HANDLER_INPUT_PAYLOAD
│   ├── retryLimit               # TaskSpec.retryLimit in text format
│   │                            # also exported as PFTQ_TASK_HANDLER_INPUT_RETRY_LIMIT
│   ├── timeoutSeconds           # TaskSpec.timeoutSeconds in text format
│   │                            # also exported as PFTQ_TASK_HANDLER_INPUT_TIMEOUT_SECONDS
│   └── meta
│       ├── taskUID            # taskUID of the task in text format
│       │                      # also exported as PFTQ_TASK_HANDLER_INPUT_TASK_UID
│       ├── processUID         # prrocessUID of the task handler process
│       │                      # also exported as PFTQ_TASK_HANDLER_INPUT_PROCESS_UID
│       ├── task.json          # whole task information in JSON format
│       │                      # also exported as PFTQ_TASK_HANDLER_INPUT_TASK_JSON
│       ├── workerName         # workerName of the worker process
│       │                      # also exported as PFTQ_TASK_HANDLER_INPUT_WORKER_NAME
│       ├── workerUID          # workerUID of the worker process
│       │                      # also exported as PFTQ_TASK_HANDLER_INPUT_WORKER_UID
│       └── workerConfig.json  # whole workerConfig information in JSON format
│                              # also exported as PFTQ_TASK_HANDLER_INPUT_WORKER_CONFIG_JSON
│
│   # pftaskqueue just creates the directory
│   # If any error happened in reading files in the directory, the task fails with the TaskResult below.
│   #   type: "Failure"
│   #   reason: "InternalError"
│   #   message: "...error message..."
│   #   payload: null
└── output
    ├── payload                     # If the file exists, the contents will record in TaskResult.payload. Null otherwise.
    │                               # Max size of the payload varies on backend type to avoid from overloading backend
    │                               #   redis: 1KB
    │                               # If size exceeded, truncated contents will be recorded.
    ├── message                     # If the file exists, the contents will record in TaskResult.message. Null otherwise.
    │                               # Max size of the payload varies on backend type to avoid from overloading backend
    │                               #   redis: 1KB
    │                               # If size exceeded, truncated contents will be recorded.
    └── postHooks.json              # postHook TaskSpecs in JSON array format
                                    # If the file exists, the contents will be queued automatically.
                                    # e.g. [{"payload": "foo", "retryLimit": "3", "timeout": "10m"}]

3 directories, 12 files

Dead letters

If pftaskqueue faced at some corrupted data for various reasons (e.g. manually edited backend data or etc.), pftaskqueue delivers the corrupted data to dead letter queue. Entries in dead letter queue is the form of:

body: "...corrupted data...."
error: "...error message..."

Please note that dead letter item can't include task uid or specs generally because it should store any corrupted data. If you are lucky, you can see task information in body section.

You can see dead letter items as below:

$ pftaskqueue get-task [queue] --state=deadletter --output yaml
- body: "..."
  err: "..."
...

Managing configurations

pftaskqueue has a lot of configuration parameters. pftaskqueue provides multiple ways to configure them. pftaskqueue reads configuraton parameter in the following precedence order. Each item takes precedence over the item below it:

  • Command line flags
  • Environment variables
  • Config file

Command line flags

See:

pftaskqueue --help

Eenvironment variables

pftaskqueue reads environment variables prefixed with PFTQ_. Environment variable names are derived from parameter key in the configuration file explained the below section. If you want to set redis.addr, please set PFTQ_REDIS_ADDR. Please capitalize parameter key and replace . with _.

IMPORTANT

Due to the bug of viper used bypftaskqueue, setting configuration via environment variables works only the case when some config file. In this case, it recommends to create a config file in your home directory like this:

$ pftaskqueue print-default-config > ~/.pftaskqueue.yaml

# Then this works
$ PFTQ_REDIS_ADDR=... pftaskqueue ...

Config file

pftaskqueue automatically reads ${HOME}/.pftaskqueue.yaml if exists. Or, you can also set any configuration path with --config=${CONFIG_FILE_PATH} flag or PFTQCONFIG environment variable. --config flag is prioritized over PFTQCONFIG environment variable.

To generate config file with default values, please run print-default-config command.

$ pftaskqueue print-default-config > ${WHEREVER_YOU_WANT}

Backend configuration reference

Redis

see inline comments below,

# backend type is 'redis' 
backend: redis
# all the configuration relates to redis 
redis:
  # key prefix of redis database
  # all the key used pftaskqueue was prefixed by '_pftaskqueue:{keyPrefix}:`
  keyPrefix: omura

  # redis server information(addr, password, db)
  addr: ""
  password: ""
  db: 0

  #
  # timeout/connection pool setting
  # see also: https://github.com/go-redis/redis/blob/a579d58c59af2f8cefbb7f90b8adc4df97f4fd8f/options.go#L59-L95
  #
  dialTimeout: 5s
  readTimeout: 3s
  writeTimeout: 3s
  poolSize: 0
  minIdleConns: 0
  maxConnAge: 0s
  poolTimeout: 4s
  idleTimeout: 5m0s
  idleCheckFrequency: 1m0s

  #
  # pftaskqueue will retry when redis operation failed
  # in exponential backoff manner.
  # you can configure backoff parameters below
  #
  backoff:
    initialInterval: 500ms
    randomizationFactor: 0.5
    multiplier: 1.2
    maxInterval: 1m0s
    maxElapsedTime: 1m0s
    # max retry count. -1 means no limit.
    maxRetry: -1

Bash/Zsh completion

pftaskqueue provides bash/zsh completion.

# for bash
. <(pftaskqueue completion bash)

# for zsh
. <(pftaskqueue completion zsh)

To configure your shell to load completions for each session add to your rc file (~/.bashrc or ~/.zshrc)

Develop

Build

# only for the first time
# $ make setup

$ make build

Unit/Integration Test

You require docker in your environment.

make test

How to make release

The release process is fully automated by tagpr. To release, just merge the latest release PR.

License

Copyright 2020 Preferred Networks, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0