Additional Tasks
These are additional Task
types that are available for you to use with Scattersphere. These tasks are optional, and can either be used or overridden as you see fit. They are simply here for convenience.
Executes a shell command as a Task
, waiting for the shell command to terminate. ShellTask
provides access to the STDOUT
output from a command, using the getProcessOutput()
command. Example:
val shellTask: ShellTask = new ShellTask("/bin/ps -ax")
val sTask1: Task = TaskBuilder()
.withName("process task")
.withTask(shellTask)
.build()
val job: Job = JobBuilder()
.withTasks(sTask1)
.build()
val jExec: JobExecutor = JobExecutor(job)
jExec.queue().job()
val out: List[String] = shellTask.getProcessOutput().toList()
out.foreach(println)
The above example will execute /bin/ps -ax
, record its output, retrieve it using getProcessOutput()
and display it using a foreach
. You get the idea.
Repeats a RunnableTask
a number of times as desired. Provides a getTimesRepeated()
function to retrieve the number of times the RunnableTask
successfully repeated execution. Example:
val rTask: RepeatingTask = new RepeatingTask(25, new RunnableTaskHere())
val task: Task = TaskBuilder()
.withName("Repeating task")
.withTask(Task)
.build()
val job: Job = JobBuilder()
.withTasks(task)
.build()
val jExec: JobExecutor = JobExecutor(job)
jExec.queue().run()
println(s"Job ran ${rTask.getTimesRepeated()} times successfully.")
Things to keep in mind:
- Only the
RepeatingTask
will contain a validTaskStatus
. The task being run will have no status. -
onException
will be called and propagated to the task when an exception occurs.
Execution cycle for the RepeatingTask
is:
/---- Loop until count -----\
v |
[task.run()] -> [task.onFinished] -/
\
`- (exception) -> [task.onException(t)] -> [terminate job]
Runs a RunnableTask
after a specified delay. The delay is given in total milliseconds. Example:
val dTask: DelayedTask = new DelayedTask(1500,
RunnableTask(new MyRunnableTask()))
val task: Task = TaskBuilder()
.withName("Delayed Task")
.withTask(dTask)
.build()
val job: Job = JobBuilder()
.withTasks(task)
.build()
val jExec: JobExecutor = JobExecutor(job)
jExec.queue().run()
This will run a MyRunnableTask
task after 1500 milliseconds.
A DockerTask
is a RunnableTask
that allows you to spin up a docker container, run a command within the container, and capture the output, much like a ShellTask
. It still provides getProcessOutput
but it also allows for modification of the flags sent to docker when running a container.
By default, DockerTask
runs a container with --net host
as the flags, as some containers expect the docker container to run with host-based networking. This is the default; however, this can be changed. To modify the settings, simply use setDockerFlags(flag: String)
. Keep in mind, any flags you set here will override those that were set by default. So, if you need --net host
, you'll want to add that manually.
Example:
val dockerTask: DockerTask = new DockerTask("centos", "curl http://www.google.com/")
val sTask1: Task = TaskBuilder("centosTask").withTask(dockerTask).build()
val job: Job = JobBuilder().withTasks(sTask1).build()
val jExec: JobExecutor = JobExecutor(job)
jExec.queue().run()
job.status shouldBe JobFinished
This will run curl http://www.google.com/
in a centos
container, and record its output.
A SparkTask
is similar to a RunnableTask
except for the fact that it uses a SparkCache
object to retrieve the SparkConf
in order to create a SparkSession
for you. See this documentation for more details.
Example:
val piTask: SparkTask = new SparkTask("sparkPiTestCache") {
override def run(): Unit = {
val slices = 2
val n = math.min(100000L * slices, Int.MaxValue).toInt
val count = getContext().parallelize(1 until n, slices).map { _ =>
val x = random * 2 - 1
val y = random * 2 - 1
if (x * x + y * y <= 1) 1 else 0
}.reduce(_ + _)
println(s"Pi is roughly ${4.0 * count / (n - 1)}")
}
}
The SparkTask
constructor takes the name of the SparkCache
key that has been assigned a SparkConf
object. Once defined, the SparkContext
and SparkSession
objects are automatically created for you.
You can then use the SparkTask
just like any other task, except for the fact that it now runs in a Scattersphere environment, so Task
s and Job
s can be defined just as before.
... To be coded ... ... To be documented ...
Licensed under the Apache License 2.0 - Lots of help from Reddit - Happy Hacking!