Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Websockets with zio-http can fail on too early messages on slow hosts #3685

Open
leon-becker-secforge opened this issue Apr 15, 2024 · 0 comments

Comments

@leon-becker-secforge
Copy link

Tapir version: 1.9.9
ZIO-Http version: 3.0.0-RC4

Scala version: 3.3.3

Describe the bug

  def websocketBroken(_u: Unit): ZIO[Any, Unit, ZioStream[Throwable, String] => ZioStream[Throwable, String]] =
    ZIO.succeed(_ => ZStream.succeed("Answer"))

  val socketApp = ZioHttpInterpreter[Any](ZioHttpServerOptions.default).toHttp(
    List(
      endpoint
        .in("websocketBroken")
        .out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](ZioStreams))
        .zServerLogic(websocketBroken)
    )
  )

When creating a websocket endpoint that sends a message to the client immedtiately after the connection is opened and using zio-http as backend, the endpoint sometimes fails with the exception shown below.
The web client does not notice the server failure and tries to re-connect after a timeout.

Log message
backend-server-1  | 2024-03-20T07:29:02.13 level=DEBUG zio-fiber-1100 not-available: "An error was silently ignored because it is not anticipated to be useful" cause=Exception in thread "zio-fiber-1102" java.lang.UnsupportedOperationException: unsupported message type: PingWebSocketFrame (expected: ByteBuf, DefaultFileRegion)
backend-server-1  |     at io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:529)
backend-server-1  |     at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:868)
backend-server-1  |     at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:877)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
backend-server-1  |     at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:103)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
backend-server-1  |     at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
backend-server-1  |     at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
backend-server-1  |     at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
backend-server-1  |     at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
backend-server-1  |     at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
backend-server-1  |     at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
backend-server-1  |     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
backend-server-1  |     at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
backend-server-1  |     at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
backend-server-1  |     at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
backend-server-1  |     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
backend-server-1  |     at java.base/java.lang.Thread.run(Unknown Source)
backend-server-1  |     at sttp.tapir.server.ziohttp.ZioHttpInterpreter.handleWebSocketResponse.app(ZioHttpInterpreter.scala:81)

The issue is basically caused by tapir sending a ping frame before zio-http and netty being ready for it.
Deactivating automatic ping handling in tapir and delaying the first message for a few milliseconds successfully solves the problem.

I reported this bug already to zio-http in zio/zio-http#2737, where they concluded that it is caused by using the API in a wrong way by sending the first message without waiting for UserEventTriggered(HandshakeComplete) to happen.

How to reproduce?

The given script shows the setup I encountered the problem with. Sadly, I can only witness the bug when running the server on a slow remote VM, where the problem occurs more than half of the time. Using the script locally on my dev machine never fails.

Steps to reproduce:

  1. Run the script with scala-cli
  2. Visit the shown URL
  3. Open the browser dev console and inspect the logs. Connections to two websockets are opended ten times each and a message is printed when a message was received from the websocket. We expect 10 messages for each websocket.
//> using dep "dev.zio::zio-http::3.0.0-RC6"
//> using dep "dev.zio::zio-logging::2.2.2"
//> using dep   "com.softwaremill.sttp.tapir::tapir-zio-http-server::1.10.4"
//> using scala "3.3.3"

import zio.http.{Server, Method, Routes, handler, Response}
import zio.http.template.*
import zio.logging.*

import zio.*
import zio.stream.{Stream => ZioStream, ZStream}

import sttp.tapir.server.ziohttp.*
import sttp.tapir.ztapir.*
import sttp.tapir.CodecFormat

import sttp.capabilities.zio.ZioStreams
import sttp.model.StatusCode

object MainServer extends ZIOAppDefault {

  def app = Routes(
    Method.GET / "" -> handler(
      Response.html(Dom.raw("""<script>
// Create WebSocket connection.
let receivedBroken = 0
let receivedWorking = 0

for(let x = 0; x<10; x++) {
const s1 = new WebSocket("ws://localhost:6896/websocketBroken");
const s2 = new WebSocket("ws://localhost:6896/websocketWorking");

// Listen for messages
s1.addEventListener("message", (event) => {
  receivedBroken += 1;
  console.log("Received broken: "+receivedBroken)
});
s2.addEventListener("message", (event) => {
  receivedWorking += 1;
  console.log("Received working: "+receivedWorking)
});
}
      </script>"""))
    )
  ).toHttpApp

  def websocketBroken(_u: Unit): ZIO[Any, Unit, ZioStream[Throwable, String] => ZioStream[Throwable, String]] =
    ZIO.succeed(_ => ZStream.succeed("Answer"))

  def websocketWorking(_u: Unit): ZIO[Any, Unit, ZioStream[Throwable, String] => ZioStream[Throwable, String]] =
    ZIO.succeed((_: ZioStream[Throwable, String]) => ZStream.succeed("Answer")).delay(Duration.fromMillis(100))

  val socketApp = ZioHttpInterpreter[Any](ZioHttpServerOptions.default).toHttp(
    List(
      endpoint
        .in("websocketBroken")
        .out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](ZioStreams))
        .zServerLogic(websocketBroken),
      endpoint
        .in("websocketWorking")
        .out(webSocketBody[String, CodecFormat.TextPlain, String, CodecFormat.TextPlain](ZioStreams))
        .zServerLogic(websocketWorking)
    )
  )

  def logger = consoleLogger(
    ConsoleLoggerConfig.default.copy(filter = LogFilter.LogLevelByNameConfig(LogLevel.All, Map.empty[String, LogLevel]))
  )

  override val bootstrap = Runtime.removeDefaultLoggers >>> logger

  def run = Console.printLine("http://localhost:6896/") *>
    Server.serve(app ++ socketApp).provide(Server.defaultWith(_.port(6896)))

}

Additional information

See previous issue zio/zio-http#2737
The log message shown in that issue is slightly different as the problem there was caused by a too early text frame instead of a ping frame, yet this shouldn't make any difference.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant