Skip to content

Additional Tasks

Ken Suenobu edited this page Jul 14, 2018 · 10 revisions

Additional Task Types

These are additional Task types that are available for you to use with Scattersphere. These tasks are optional, and can either be used or overridden as you see fit. They are simply here for convenience.

ShellTask

Executes a shell command as a Task, waiting for the shell command to terminate. ShellTask provides access to the STDOUT output from a command, using the getProcessOutput() command. Example:

  val shellTask: ShellTask = new ShellTask("/bin/ps -ax")
  val sTask1: Task = TaskBuilder()
    .withName("process task")
    .withTask(shellTask)
    .build()
  val job: Job = JobBuilder()
    .withTasks(sTask1)
    .build()
  val jExec: JobExecutor = JobExecutor(job)

  jExec.queue().job()

  val out: List[String] = shellTask.getProcessOutput().toList()

  out.foreach(println)

The above example will execute /bin/ps -ax, record its output, retrieve it using getProcessOutput() and display it using a foreach. You get the idea.

RepeatingTask

Repeats a RunnableTask a number of times as desired. Provides a getTimesRepeated() function to retrieve the number of times the RunnableTask successfully repeated execution. Example:

  val rTask: RepeatingTask = new RepeatingTask(25, new RunnableTaskHere())
  val task: Task = TaskBuilder()
    .withName("Repeating task")
    .withTask(Task)
    .build()
  val job: Job = JobBuilder()
    .withTasks(task)
    .build()
  val jExec: JobExecutor = JobExecutor(job)

  jExec.queue().run()

  println(s"Job ran ${rTask.getTimesRepeated()} times successfully.")

Things to keep in mind:

  • Only the RepeatingTask will contain a valid TaskStatus. The task being run will have no status.
  • onException will be called and propagated to the task when an exception occurs.

Execution cycle for the RepeatingTask is:

         /---- Loop until count -----\
        v                            |
  [task.run()] -> [task.onFinished] -/
        \
         `- (exception) -> [task.onException(t)] -> [terminate job]

DelayedTask

Runs a RunnableTask after a specified delay. The delay is given in total milliseconds. Example:

  val dTask: DelayedTask = new DelayedTask(1500,
    RunnableTask(new MyRunnableTask()))
  val task: Task = TaskBuilder()
    .withName("Delayed Task")
    .withTask(dTask)
    .build()
  val job: Job = JobBuilder()
    .withTasks(task)
    .build()
  val jExec: JobExecutor = JobExecutor(job)

  jExec.queue().run()

This will run a MyRunnableTask task after 1500 milliseconds.

DockerTask

A DockerTask is a RunnableTask that allows you to spin up a docker container, run a command within the container, and capture the output, much like a ShellTask. It still provides getProcessOutput but it also allows for modification of the flags sent to docker when running a container.

By default, DockerTask runs a container with --net host as the flags, as some containers expect the docker container to run with host-based networking. This is the default; however, this can be changed. To modify the settings, simply use setDockerFlags(flag: String). Keep in mind, any flags you set here will override those that were set by default. So, if you need --net host, you'll want to add that manually.

Example:

  val dockerTask: DockerTask = new DockerTask("centos", "curl http://www.google.com/")
  val sTask1: Task = TaskBuilder("centosTask").withTask(dockerTask).build()
  val job: Job = JobBuilder().withTasks(sTask1).build()
  val jExec: JobExecutor = JobExecutor(job)

  jExec.queue().run()
  job.status shouldBe JobFinished

This will run curl http://www.google.com/ in a centos container, and record its output.

SparkTask

A SparkTask is similar to a RunnableTask except for the fact that it uses a SparkCache object to retrieve the SparkConf in order to create a SparkSession for you. See this documentation for more details.

Example:

  val piTask: SparkTask = new SparkTask("sparkPiTestCache") {
    override def run(): Unit = {
      val slices = 2
      val n = math.min(100000L * slices, Int.MaxValue).toInt
      val count = getContext().parallelize(1 until n, slices).map { _ =>
        val x = random * 2 - 1
        val y = random * 2 - 1

        if (x * x + y * y <= 1) 1 else 0
      }.reduce(_ + _)

      println(s"Pi is roughly ${4.0 * count / (n - 1)}")
    }
  }

The SparkTask constructor takes the name of the SparkCache key that has been assigned a SparkConf object. Once defined, the SparkContext and SparkSession objects are automatically created for you.

You can then use the SparkTask just like any other task, except for the fact that it now runs in a Scattersphere environment, so Tasks and Jobs can be defined just as before.

MesosTask

... To be coded ... ... To be documented ...