Skip to content

Commit

Permalink
fixed websocket client to properly start and stop akka stream (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
mliarakos committed May 27, 2020
1 parent 79b3017 commit a44441a
Showing 1 changed file with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) e
exceptionSerializer: ExceptionSerializer
) extends Subscriber[ByteString] {
override def onSubscribe(s: Subscription): Unit = {
socket.addEventListener[CloseEvent]("close", (_: CloseEvent) => s.cancel())
s.request(Long.MaxValue)
}

Expand Down Expand Up @@ -122,14 +123,8 @@ private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) e
if (!hasSubscription) {
hasSubscription = true

// Close the socket if the source subscriber cancels
subscriber.onSubscribe(new Subscription {
override def request(n: Long): Unit = {}
override def cancel(): Unit = socket.close()
})

// Forward messages from the socket to the source subscriber
socket.onmessage = (message: MessageEvent) => {
// Forward messages from the socket to the subscriber
val onMessage = (message: MessageEvent) => {
// The message data should be either an ArrayBuffer or a String
// It should not be a Blob because the socket binaryType was set
val data = message.data match {
Expand All @@ -138,24 +133,40 @@ private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) e
}
subscriber.onNext(data)
}
// Complete the source subscriber with an error if the socket encounters an error
socket.onerror = (event: Event) => {
// Complete the subscriber with an error if the socket encounters an error
val onError = (event: Event) => {
subscriber.onError(new RuntimeException(s"WebSocket error: ${event.`type`}"))
}
// Complete the source subscriber when the socket closes based on the close code
socket.onclose = (event: CloseEvent) => {
// Complete the subscriber when the socket closes based on the close code
val onClose = (event: CloseEvent) => {
if (event.code == NormalClosure) {
// Successfully complete the source subscriber because the socket closed normally
// Successfully complete the subscriber because the socket closed normally
subscriber.onComplete()
} else {
// Complete the source subscriber with an error because the socket closed with an error
// Complete the subscriber with an error because the socket closed with an error
// Parse the error reason as an exception
val bytes = ByteString.fromString(event.reason)
val exception =
exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
subscriber.onError(exception)
}
}

// Configure the subscriber to start the stream
subscriber.onSubscribe(new Subscription {
private var hasStarted = false
// Configure the socket after the initial subscriber request
override def request(n: Long): Unit = {
if (!hasStarted) {
hasStarted = true
socket.onmessage = onMessage
socket.onerror = onError
socket.addEventListener[CloseEvent]("close", onClose)
}
}
// Close the socket if the subscriber cancels
override def cancel(): Unit = socket.close()
})
} else {
subscriber.onError(new IllegalStateException("This publisher only supports one subscriber"))
}
Expand Down

0 comments on commit a44441a

Please sign in to comment.