Skip to content

Commit

Permalink
Moved datapacket logic from Application to TuktuJSGenerator, so optio…
Browse files Browse the repository at this point in the history
…ns can be used can be applied. Fixes #39
  • Loading branch information
mirekvink committed Jul 28, 2016
1 parent e5d942b commit 7e36fe7
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 48 deletions.
7 changes: 7 additions & 0 deletions modules/api/app/tuktu/api/api.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import play.api.libs.json.JsObject
import play.api.libs.json.JsValue
import java.util.concurrent.LinkedBlockingQueue
import scala.concurrent.Future
import play.api.mvc.Request
import play.api.mvc.AnyContent

case class DataPacket(
data: List[Map[String, Any]]
Expand Down Expand Up @@ -58,6 +60,11 @@ case class ResponsePacket(
json: JsValue
)

case class RequestPacket(
request: Request[AnyContent],
isInitial: Boolean
)

case class HealthCheck()
case class HealthReply()

Expand Down
47 changes: 7 additions & 40 deletions modules/web/app/controllers/Application.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import tuktu.api.WebJsNextFlow
import tuktu.web.js.JSGeneration
import tuktu.api.WebJsOrderedObject
import tuktu.api.ErrorPacket
import tuktu.api.RequestPacket

object Application extends Controller {
val source = scala.io.Source.fromFile(Play.application.getFile("public/images/pixel.gif"))(scala.io.Codec.ISO8859)
Expand Down Expand Up @@ -50,8 +51,7 @@ object Application extends Controller {
BadRequest("// No repository for JavaScripts and Tuktu flows set in Tuktu configuration.")
}
case Some(webRepo) => {
// Get the referer
val referrer = request.headers.get("referer")


// Get actual actor
val actorRefMap = Cache.getAs[collection.mutable.Map[String, ActorRef]]("web.hostmap")
Expand All @@ -67,59 +67,26 @@ object Application extends Controller {
}
else {
implicit val timeout = Timeout(Cache.getAs[Int]("timeout").getOrElse(5) seconds)
// Get body data and potentially the name of the next flow
val (bodyData, flowName) = {
val params = request.body.asJson.getOrElse(Json.obj()).asInstanceOf[JsObject]
(
(params \ "d").asOpt[JsObject].getOrElse(Json.obj()),
(params \ "f").asOpt[String])
}

// Set up the data packet
val dataPacket = if (isInitial)
new DataPacket(List(Map(
// By default, add referer, request and headers
"url" -> referrer.getOrElse(""),
"request" -> request,
"request_path" -> request.path,
"request_uri" -> request.uri,
"request_host" -> request.host,
"headers" -> request.headers,
"cookies" -> request.cookies.map(c => c.name -> c.value).toMap,
Cache.getAs[String]("web.jsname").getOrElse(Play.current.configuration.getString("tuktu.jsname").getOrElse("tuktu_js_field")) -> new WebJsOrderedObject(List())
)))
else {
new DataPacket(List(Map(
// By default, add referer, request and headers
"url" -> referrer.getOrElse(""),
"request" -> request,
"request_path" -> request.path,
"request_uri" -> request.uri,
"request_host" -> request.host,
"cookies" -> request.cookies.map(c => c.name -> c.value).toMap,
Cache.getAs[String]("web.jsname").getOrElse(Play.current.configuration.getString("tuktu.jsname").getOrElse("tuktu_js_field")) -> new WebJsOrderedObject(List()),
"headers" -> request.headers) ++ bodyData.keys.map(key => key -> utils.JsValueToAny(bodyData \ key))))
}

// See if we need to start a new flow or if we can send to the running actor
val resultFut = if (isInitial) {
// Send the Actor a DataPacket
val actorRef = actorRefMap(id)

if (image) {
actorRef ! dataPacket
actorRef ! RequestPacket(request, isInitial)
Future {}
}
else
actorRef ? dataPacket
actorRef ? RequestPacket(request, isInitial)
} else {
// Since this is not the default flow, we have to see if this one is running, and start
// if if this is not the case

// Flow name must be set
flowName match {
(request.body.asJson.getOrElse(Json.obj()).asInstanceOf[JsObject] \"f").asOpt[String] match {
case None => {
// Flow name is gone, this cant be
// Flow name is gone, this can't be
Future {}
}
case Some(fn) => {
Expand All @@ -139,7 +106,7 @@ object Application extends Controller {
}

// Send the Actor a DataPacket containing the referrer
actorRefMap(id + "." + fn) ? dataPacket
actorRefMap(id + "." + fn) ? RequestPacket(request, isInitial)
}
}
}
Expand Down
45 changes: 37 additions & 8 deletions modules/web/app/tuktu/web/generators/TuktuJSGenerator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package tuktu.web.generators

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.DurationInt

import akka.actor.ActorRef
import akka.actor.PoisonPill
import akka.util.Timeout
Expand All @@ -17,6 +16,11 @@ import play.api.libs.iteratee.Iteratee
import play.api.libs.json.JsValue
import tuktu.api._
import play.api.Logger
import play.api.mvc.Request
import play.api.mvc.AnyContent
import play.api.Play
import play.api.libs.json.JsObject
import play.api.libs.json.Json

/**
* Gets a webpage's content based on REST request
Expand Down Expand Up @@ -54,7 +58,7 @@ class TuktuJSGenerator(

sourceActor ! d
// Remove this requester from the list.
Cache.getOrElse("JSGenerator.requesters", 30)(collection.mutable.ListBuffer.empty[ActorRef]) -= sourceActor
Cache.getOrElse("JSGenerator.requesters")(collection.mutable.ListBuffer.empty[ActorRef]) -= sourceActor

d
})
Expand All @@ -72,7 +76,7 @@ class TuktuJSGenerator(
case config: JsValue => {}
case error: ErrorPacket => {
// Inform all the requesters that an error occurred.
Cache.getOrElse("JSGenerator.requesters", 30)(collection.mutable.ListBuffer.empty[ActorRef]).foreach { x => x ! error }
Cache.getOrElse("JSGenerator.requesters")(collection.mutable.ListBuffer.empty[ActorRef]).foreach(_ ! error)
}
case sp: StopPacket => {
// Remove ourselves from the cache
Expand All @@ -91,16 +95,41 @@ class TuktuJSGenerator(
channel.eofAndEnd
self ! PoisonPill
}
case dp: DataPacket => {
case r: RequestPacket => {
val request = r.request

// Get body data and potentially the name of the next flow
val bodyData = (request.body.asJson.getOrElse(Json.obj()).asInstanceOf[JsObject] \ "d").asOpt[JsObject].getOrElse(Json.obj())

// Keep track of all senders, in case of errors
Cache.getOrElse("JSGenerator.requesters", 30)(collection.mutable.ListBuffer.empty[ActorRef]) += sender

Cache.getOrElse("JSGenerator.requesters", 30)(collection.mutable.ListBuffer.empty[ActorRef]) += sender

// Set up the data packet
val dp = new DataPacket(List(Map(
// By default, add referer, request and headers
"url" -> request.headers.get("referer").getOrElse(""),
"request" -> request,
"request_path" -> request.path,
"request_uri" -> request.uri,
"request_host" -> request.host,
"headers" -> request.headers,
"cookies" -> request.cookies.map(c => c.name -> c.value).toMap,
Cache.getAs[String]("web.jsname").getOrElse(Play.current.configuration.getString("tuktu.jsname").getOrElse("tuktu_js_field")) -> new WebJsOrderedObject(List()))
++ (
if (r.isInitial) {
Map.empty
} else {
bodyData.keys.map(key => key -> utils.JsValueToAny(bodyData \ key))
}
)
))

// Push to all async processors
channel.push(dp)

// Send through our enumeratee
val p = new senderReturningProcessor(sender, dp)
p.runProcessor()
p.runProcessor
}
}
}

0 comments on commit 7e36fe7

Please sign in to comment.