Skip to content

JobExecutor

Ken Suenobu edited this page May 28, 2018 · 15 revisions

Note: This documentation covers the latest version in the develop branch.

JobExecutor

The JobExecutor takes a Job object, traverses all of the Tasks defined within it, creating an execution DAG. The execution plan creates a DAG based on the list of Task dependencies by way of transitive reduction.

Creation of said execution plan determines which tasks can run asynchronously depending on their definitions by the developer. It then generates CompletableFuture.

Creating a JobExecutor

Creation of a JobExecutor is the last step before you actually run a Job. Please make sure to familiarize yourself with creating Tasks and Jobs before continuing on.

Let's assume you've created a Job object, and you're ready to prepare that job for execution. To do so:

    val jobExec: JobExecutor = JobExecutor(job)

That's all there is to it.

Executing jobs

Control of execution of the Jobs is where it can get interesting.

When you create a JobExecutor and assign the Job, all you've essentially done is called the constructor and created an object. It is not until you call the queue() method that the DAG is created, an execution plan is formed, and the Job is placed into JobQueued state, along with all of its Tasks.

JobExecution can take place one of two ways: either in a blocking state (meaning when you call the run() method, it blocks until complete), or in a non-blocking state (fire-and-forget.)

Setting the JobExecution to run either blocking or non-blocking is simply calling the setBlocking(flag) method, and setting false to run non-blocking. JobExecution runs all Jobs in blocking state unless otherwise set. Note: you can retrieve a JobExecution's blocking status by examining the blocking value.

Once the JobExecutor has queued the Job for execution, it has created an internal CompletableFuture to execute, with controls allowing to pause and resume the job.

To run the Job, all that's required is a call to the run() method.

Example:

    val task1: Task = TaskBuilder() ... .build()
    val job1: Job = JobBuilder() ... .build()
    val jobExec: JobExecutor = JobExecutor(job1)

    jobExec.queue().run()

This will run the Job either in the background, or wait on the run() until the execution completes all of the Tasks associated with it.

NOTE: Calling queue() will queue the Task list in the order in which it was created. Tasks are never created out-of-order.

Exception States

The following is a list of Exceptions that could be thrown from the JobExecutor:

  • InvalidTaskStateException: indicates a Task status was unexpectedly altered
  • InvalidJobExecutionStateException: indicates a Job status was unexpectedly altered
  • DuplicateTaskNameException: indicates a Task being queued already exists by the task.name
  • InvalidTaskDependencyException: indicates a Task specified a dependsOn that was not yet queued

The first two Exception types are rare, and only really occur when running code out of sync, or if these statuses are modified outside of the JobExecutor class. You rarely need to modify the Task status yourself.

DuplicateTaskNameException can occur when a Task is added to a Job twice, or if a Task has a duplicate name in another Task.

InvalidTaskDependencyException can occur when a Task depends on another Task but the dependent Task has not yet been queued.

Flow Control

Flow control is provided via two mechanisms: pause() and resume().

Calling pause() on the JobExecutor will pause further execution of any Tasks that have not yet been executed. Calling pause() twice in succession will have no effect on the executor.

Calling resume() on the JobExecutor will resume execution of any remaining Tasks that have not yet been executed. Calling resume() twice in succession will have no effect on the executor.

Canceling a Job

To cancel a Job that is currently in progress, simply call the cancel(reason: String) method. This will force the CompletableFuture to cancel whatever Task is currently running (interrupting it, essentially), and setting the Job status to JobCanceled(reason).

Any Tasks that were currently in progress will be canceled, and any further Tasks that were scheduled to run will be canceled, their status will be set to TaskCanceled(reason).

Advanced Access

If you want to get at the CompletableFuture[Void] after it has been created, use the getCompletableFuture() method call.

Recap

So, to recap, all that's required to run a Job is three steps:

  1. Create a Task
  2. Define a Job
  3. Create a JobExecutor
  4. Queue the JobExecutor
  5. Set to non-blocking if you want to run the Job in the background, and finally
  6. Call run() on the JobExecutor.

That's it.

Once the JobExecutor's run() method has completed, you can view the Job's status accessor, and view the status of the Job after it has run. If no Exception was thrown during the run, the Job should terminate normally. Otherwise, you can view the result of the problem by matching the Job Status against a JobFailed(reason) and extract the root Exception.