Skip to content
Ken Suenobu edited this page Jul 9, 2018 · 17 revisions

Note: This documentation covers the latest version in the develop branch.

Jobs

A Job is a collection of Tasks.

Job Properties

Jobs have five properties:

  • A numerical ID (auto-generated by the JobBuilder)
  • A name (simply identifies the name of the Job - this can be anything, and is optional.)
  • A list of Tasks to execute
  • A status indicating whether or not the Job is Queued, Running, Finished, or Failed.
  • A set of job statistics to view when a job started, ended, and the elapsed time.

Note, if a name is not given for the Job, the JobBuilder will take the name of the first Task in the list, and append "Job" to the end of its name. If the Task also has no name, the name of the Job will just become "Job".

JobBuilder

In order to create a Job object, you can either use the Job class and create it manually, or use the JobBuilder() factory, which is the preferred method.

Example:

    val myJob = JobBuilder()
        .withTasks(task1)
        .build()

This will create a myJob object with the name of the Task (task1) + "Job" appended to the name upon creation of the Job.

You can also add multiple Tasks:

    val myJob = JobBuilder()
        .withName("My complicated job")
        .withTasks(task1, task2, task3, task4)
        .build()

Tasks should be added to the list in the order in which they were created to maintain a proper DAG creation. This will guarantee that the DAG tree gets generated in the right order with the correct dependencies. Doing this out of order could create unexpected results.

The same code can be used instead of using withName:

  val myJob = JobBuilder("My complicated job")
    .withTasks(task1, task2, task3, task4)
    .build()

Executing Jobs

To run a Job after creation, you will want to use the JobExecutor class. A quick example:

  val myTask: Task = Task(name) { ... code ... }
  val myTask2: Task = Task.async(name) { ... async code ... }
  val myTask3: Task = TaskBuilder() ... .build()
  val myJob: Job = JobBuilder() ... .build()
  val jobExec: JobExecutor = JobExecutor(myJob)

  jobExec.queue().run()

  println(s"Job ID: ${myJob.id}")

This will queue your Job for execution by creating the DAG, an execution plan, creating the CompletableFuture object to execute that plan, and wait until the execution terminates either successfully, or generates an exception.

Registering and Executing Jobs by Name

To register a job by a specific name, JobExecutor provides a register function in the object helper class. To register a job, use the following code:

  val myJob: Job = JobBuilder() ... .build()
  val myJob2: Job = JobBuilder() ... .build()

  JobExecutor.register("My Job Name", myJob)
  JobExecutor.register("My Second Job", myJob2)

  JobExecutor("My Job Name").queue().run()
  JobExecutor("My Second Job").queue().run()

Keep in mind, if you register the a job with the same name, a DuplicateJobNameException will be thrown. Likewise, if you try to run a job that does not exist, a JobNotFoundException will be thrown. It is important to register your jobs before calling them.

If you do not want to register a job, just run jobs as in the "Executing Jobs" section above.

Job Statistics

To retrieve the runtime information about a Job, you can pull the Statistics instance by using job.getStatistics(). This will allow you to retrieve the start time, end time, and elapsed time, all in milliseconds.

Example:

  val task ...
  val job ...
  val jExec ...

  println(s"Job elapsed time: ${job.getStatistics().getElapsedTime}ms")

This is strictly available for statistical information.

Additionally, you can retrieve the start and end times for the Statistics by calling getStartTime and getEndTime properties, respectively.

Job Status

Job Statuses are extended from the JobStatus class, and are similar to those defined in the TaskStatus class.

A JobStatus object indicates the status of the overall Job - whether or not it's queued, running, finished successfully, or failed with an exception.

The Job Statuses are:

  • JobQueued: Indicates that a Job has been created and initialized, but is not yet running.
  • JobRunning: Indicates that a Job is running, and Tasks are being executed.
  • JobFinished: Indicates that a Job has terminated normally.
  • JobFailed(reason): Indicates that a Task in the Job has generated an Exception and failed.
  • JobCanceled(reason): Indicates that a Job has been canceled with the reason specified.

When a job fails, the JobExecutor will throw an exception as well. Once you catch the exception, you can look at the cause of the failure by matching the JobFailed object. You can also delve into the Job's Tasks and see which Task generated the fault.

When a job is canceled, the cancellation reason is propagated to the current Task, and all queued Tasks.