-
Notifications
You must be signed in to change notification settings - Fork 0
/
ModelServer.scala
123 lines (103 loc) · 5.13 KB
/
ModelServer.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package com.lightbend.scala.custom.modelserver
import java.util.{HashMap, Properties}
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Route
import akka.stream.ActorMaterializer
import akka.util.Timeout
import com.lightbend.model.winerecord.WineRecord
import com.lightbend.java.configuration.kafka.ApplicationKafkaParameters
import com.lightbend.scala.custom.queriablestate.QueriesResource
import com.lightbend.scala.custom.store.ModelStateStoreBuilder
import com.lightbend.scala.modelServer.model.{ModelToServe, ModelWithDescriptor, ServingResult}
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, Predicate, ValueMapper}
import org.apache.kafka.streams.{KafkaStreams, StreamsBuilder, StreamsConfig}
import scala.concurrent.duration._
import scala.util.Try
object ModelServer {
private val port = 8888 // Port for queryable state
import ApplicationKafkaParameters._
def main(args: Array[String]): Unit = {
System.out.println("Using kafka brokers at " + KAFKA_BROKER)
val streamsConfiguration = new Properties
// Give the Streams application a unique name. The name must be unique in the Kafka cluster
// against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-model-server")
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, DATA_GROUP)
// Where to find Kafka broker(s).
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER)
// Provide the details of our embedded http service that we'll use to connect to this streams
// instance and discover locations of stores.
streamsConfiguration.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "127.0.0.1:" + port)
// Default serdes
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass)
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray.getClass)
// Add a topic config by prefixing with topic
// streamsConfiguration.put(StreamsConfig.topicPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest")
// Create topology
val streams = createStreams(streamsConfiguration)
// Set Stream exception handler
streams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
override def uncaughtException(t: Thread, e: Throwable): Unit = {
System.out.println("Uncaught exception on thread " + t + " " + e.toString)
}
})
// Start streams
streams.start()
// Start the Restful proxy for servicing remote access to state stores
startRestProxy(streams, port)
// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams
sys.addShutdownHook{
try {
streams.close()
} catch {
case e: Exception =>
// ignored
}
}
}
private def createStreams(streamsConfiguration: Properties) : KafkaStreams = { // Create topology
// Store definition
val logConfig = new HashMap[String, String]
val storeBuilder: ModelStateStoreBuilder = new ModelStateStoreBuilder(ApplicationKafkaParameters.STORE_NAME).withLoggingEnabled(logConfig)
// Create Stream builder
val builder = new StreamsBuilder
// Data input streams
val data : KStream[Array[Byte], Array[Byte]] = builder.stream(DATA_TOPIC)
val models : KStream[Array[Byte], Array[Byte]] = builder.stream(MODELS_TOPIC)
// DataStore
builder.addStateStore(storeBuilder)
// Data Processor
data
.mapValues[Try[WineRecord]](new DataValueMapper().asInstanceOf[ValueMapper[Array[Byte], Try[WineRecord]]])
.filter(new DataValueFilter().asInstanceOf[Predicate[Array[Byte], Try[WineRecord]]])
.transform(() => new DataProcessorKV, STORE_NAME)
.mapValues[ServingResult](new ResultPrinter())
// Value Processor
models
.mapValues[Try[ModelToServe]](new ModelValueMapper().asInstanceOf[ValueMapper[Array[Byte],Try[ModelToServe]]])
.filter(new ModelValueFilter().asInstanceOf[Predicate[Array[Byte], Try[ModelToServe]]])
.mapValues[Try[ModelWithDescriptor]](new ModelDescriptorMapper().asInstanceOf[ValueMapper[Try[ModelToServe],Try[ModelWithDescriptor]]])
.filter((new ModelDescriptorFilter().asInstanceOf[Predicate[Array[Byte], Try[ModelWithDescriptor]]]))
.process(new ModelProcessor, STORE_NAME)
// Create and build topology
val topology = builder.build
println(topology.describe)
return new KafkaStreams(topology, streamsConfiguration)
}
private def startRestProxy(streams: KafkaStreams, port: Int) = {
implicit val system = ActorSystem("ModelServing")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
implicit val timeout = Timeout(10.seconds)
val host = "127.0.0.1"
val port = 8888
val routes: Route = QueriesResource.storeRoutes(streams, port)
Http().bindAndHandle(routes, host, port) map
{ binding => println(s"Starting models observer on port ${binding.localAddress}") } recover {
case ex =>
println(s"Models observer could not bind to $host:$port", ex.getMessage)
}
}
}