Skip to content

Commit

Permalink
Merge pull request #48 from KenSuenobu/ticket-47
Browse files Browse the repository at this point in the history
Ticket 47
  • Loading branch information
KenSuenobu committed May 20, 2018
2 parents 511c3d8 + cfd57bf commit 0d61ce1
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 32 deletions.
Expand Up @@ -117,23 +117,29 @@ case class Task(name: String, task: RunnableTask, dependencies: Seq[Task], async
*
* Super-convenient way to create a synchronous [[Task]] without having to do a bunch of class definitions.
*
* Asynchronous tasks can be built the same way with the Task.async companion object.
*
* @since 0.0.3
*/
object Task {

/** Generate a [[Task]] using the body of the task in the block rather than using a [[TaskBuilder]] to do the same
* thing.
/** Generate a synchronous [[Task]] using the body of the task as the runnable code.
*
* @param a function code to run
* @return [[Task]] with the closure wrapped in a [[RunnableTask]], with no name and no dependencies.
*/
def apply(block: => Unit): Task = evaluate(block)

/** Generate an asynchronous [[Task]] using the body of the task as the runnable code.
*
* @param a closure to run
* @param a function code to run
* @return [[Task]] with the closure wrapped in a [[RunnableTask]], with no name and no dependencies.
*/
def apply(a: => Unit): Task = evaluate(() => a)
def async(block: => Unit): Task = evaluate(block, true)

private def evaluate(a: () => Unit): Task = Task("", new RunnableTask {
override def run(): Unit = {
a()
}
}, Seq(), false)
private def evaluate(block: => Unit, async: Boolean = false) = Task("", new RunnableTask {
override def run(): Unit = block
}, Seq(), async)

}

Expand Down
Expand Up @@ -197,6 +197,11 @@ class JobExecutor(job: Job) extends LazyLogging {

}

/** Factory object to create a new [[JobExecutor]] object. */
object JobExecutor {
def apply(job: Job): JobExecutor = new JobExecutor(job)
}

/** An exception indicating that a [[Task]] was in a different state than expected.
*
* @param task [[Task]] object.
Expand Down
Expand Up @@ -89,7 +89,7 @@ class ComplicatedJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test")
.withTasks(task1, task2, task3)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.tasks.length shouldBe 3
job1.tasks(0) shouldBe task1
Expand Down Expand Up @@ -182,7 +182,7 @@ class ComplicatedJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test")
.withTasks(task1, task2, task3, task4)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.tasks.length shouldBe 4
job1.tasks(0) shouldBe task1
Expand Down Expand Up @@ -298,7 +298,7 @@ class ComplicatedJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test")
.withTasks(task1, task2, task3, task4, task5, task6)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.tasks.length shouldBe 6
job1.tasks(0) shouldBe task1
Expand Down
Expand Up @@ -29,7 +29,7 @@ class ExceptionJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Cancelable Job")
.withTasks(task1)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.tasks.length shouldBe 1
job1.tasks(0) shouldBe task1
Expand Down
Expand Up @@ -40,15 +40,6 @@ class RealWorldTest extends FlatSpec with Matchers with LazyLogging {
"https://www.rust-lang.org/en-US/documentation.html")
var webData: ConcurrentMap[String, String] = new ConcurrentHashMap[String, String]()

class DataFetchRunnable(url: String) extends Runnable {
override def run(): Unit = {
logger.debug(s"Fetching URL $url")
val data: String = Source.fromURL(url).mkString
webData.put(url.toLowerCase(), data.toLowerCase())
logger.debug(s"Fetch of $url complete: ${data.length} bytes")
}
}

class StripFetchedDataRunnable(url: String, count: Int) extends Runnable {
override def run(): Unit = {
val data: String = webData.get(url.toLowerCase())
Expand Down Expand Up @@ -98,12 +89,14 @@ class RealWorldTest extends FlatSpec with Matchers with LazyLogging {
}

for((url, counter) <- urls.zipWithIndex) {
val fetcherRunnableTask: RunnableTask = RunnableTask(new DataFetchRunnable(url))
val fetcherTask: Task = Task {
logger.debug(s"Fetching URL $url")
val data: String = Source.fromURL(url).mkString
webData.put(url.toLowerCase(), data.toLowerCase())
logger.debug(s"Fetch of $url complete: ${data.length} bytes")
}
val stripDataRunnableTask: RunnableTask = RunnableTask(new StripFetchedDataRunnable(url, counter))
val wordsCountRunnableTask: RunnableTask = RunnableTask(new CountWordsRunnable(url, counter))
val fetcherTask: Task = TaskBuilder()
.withTask(fetcherRunnableTask)
.build()
val stripDataTask: Task = TaskBuilder()
.withTask(stripDataRunnableTask)
.dependsOn(fetcherTask)
Expand All @@ -115,10 +108,9 @@ class RealWorldTest extends FlatSpec with Matchers with LazyLogging {
.async()
.build()
val urlTestJob: Job = JobBuilder()
.withName("urlTest")
.withTasks(fetcherTask, stripDataTask, wordsCountTask)
.build()
val jobExec: JobExecutor = new JobExecutor(urlTestJob)
val jobExec: JobExecutor = JobExecutor(urlTestJob)

println(s"Running job for URL: $url")

Expand Down
Expand Up @@ -89,7 +89,7 @@ class SimpleJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test")
.withTasks(task1, task2, task3)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.tasks.length shouldBe 3
job1.tasks(0) shouldBe task1
Expand Down Expand Up @@ -178,7 +178,7 @@ class SimpleJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test")
.withTasks(task1)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

job1.status shouldBe JobQueued
jobExec.blocking shouldBe true
Expand All @@ -192,7 +192,7 @@ class SimpleJobTest extends FlatSpec with Matchers with LazyLogging {
.withName("Test2")
.withTasks(task1)
.build()
val jobExec2: JobExecutor = new JobExecutor(job2)
val jobExec2: JobExecutor = JobExecutor(job2)

// This will be upgraded soon so that the underlying cause can be pulled from the Job, but only if the job
// completes exceptionally.
Expand Down Expand Up @@ -223,9 +223,43 @@ class SimpleJobTest extends FlatSpec with Matchers with LazyLogging {
val job1: Job = JobBuilder()
.withTasks(task1)
.build()
val jobExec: JobExecutor = new JobExecutor(job1)
val jobExec: JobExecutor = JobExecutor(job1)

jobExec.queue().run()
}

it should "be able to run asynchronous tasks with convenience" in {
val task1: Task = Task {
Thread.sleep(500)
logger.info("Sleep 500 ms")
Thread.sleep(500)
logger.info("Sleep another 500 ms")
}
val task2: Task = Task.async {
Thread.sleep(500)
logger.info("Sleep 500 ms")
Thread.sleep(1000)
logger.info("Sleep 1000 ms")
}
val task3: Task = Task.async {
Thread.sleep(500)
logger.info("Sleep 500 ms")
Thread.sleep(1000)
logger.info("Sleep 1000 ms")
}
val job1: Job = JobBuilder()
.withTasks(task1)
.build()
val jobExec: JobExecutor = JobExecutor(job1)

jobExec.queue().run()

val job2: Job = JobBuilder()
.withTasks(task2, task3)
.build()
val jobExec2: JobExecutor = JobExecutor(job2)

jobExec2.queue().run()
}

}

0 comments on commit 0d61ce1

Please sign in to comment.