Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Connection hangs with no useful logs when response zio stream fails #3540

Open
maximskripnik opened this issue Feb 28, 2024 · 8 comments
Labels

Comments

@maximskripnik
Copy link

maximskripnik commented Feb 28, 2024

Tapir version: 1.9.10

Scala version: 3.3.1

Connection hangs with no useful logs when response zio stream fails

When using zio streams to serve http response using ZioHttpInterpreter, the server does not properly handle stream failures. The client that sent request hangs, so apparently the server doesn't even close the connection. On top of that, server logs are full of Netty internal exceptions instead of the root cause exception that failed the stream. Additionally, elements from the stream preceding the failure are not sent in the response as well

How to reproduce?

Server code

This example spins up a server with one endpoint /test that returns a simple plain text stream that fails after two elements are emitted

// ThisBuild / scalaVersion := "3.3.1"

// libraryDependencies ++= Seq(
//  "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.9.10"
// )

package tapirsample

import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.stream.ZStream

import java.nio.charset.StandardCharsets

object Sample extends ZIOAppDefault {

  val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
    .in("test")
    .get
    .out(
      streamBody(ZioStreams)(
        summon[Schema[Chunk[String]]],
        CodecFormat.TextPlain(),
        Some(StandardCharsets.UTF_8)
      )
    )
    .zServerLogic { _ =>
      val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
      val byteStream = stream.mapConcat(_.getBytes)
      ZIO.succeed(byteStream)
    }

  val routes: HttpApp[Any] =
    ZioHttpInterpreter().toHttp(badEndpoint)

  override def run: ZIO[Any, Any, Any] =
    Server.serve(routes).provide(Server.defaultWithPort(9000))

}

Running the server

sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:27:59
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] running tapirsample.Sample

Sending the request

curl -v 'http://localhost:9000/test'
*   Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<

You can observe two issues here:

  • curl just hangs forever after this (until timeout) waiting for response from the server
  • there are no even first two strings from the response stream (foo, bar). The response body is just empty

Server logs

[info] running tapirsample.Sample
timestamp=2024-02-28T10:29:01.638818Z level=WARN thread=#zio-fiber-39 message="Fatal exception in Netty" cause="Exception in thread "zio-fiber-" io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
	at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
	Suppressed: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
		at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
		at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
		at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
		at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
		at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
		at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
		at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
		at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
		at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
		at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
		at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
		at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
		at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
		at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
		at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
		at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
		at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
		at java.base/java.lang.Thread.run(Thread.java:833)"
Feb 28, 2024 11:29:01 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
io.netty.handler.codec.EncoderException: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
	at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:108)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:881)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.write(HttpServerKeepAliveHandler.java:87)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:863)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:968)
	at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:856)
	at io.netty.channel.ChannelDuplexHandler.write(ChannelDuplexHandler.java:115)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:879)
	at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:940)
	at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1247)
	at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:300)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.IllegalStateException: unexpected message type: DefaultFullHttpResponse, state: 2
	at io.netty.handler.codec.http.HttpObjectEncoder.throwUnexpectedMessageTypeEx(HttpObjectEncoder.java:348)
	at io.netty.handler.codec.http.HttpObjectEncoder.encodeFullHttpMessage(HttpObjectEncoder.java:305)
	at io.netty.handler.codec.http.HttpObjectEncoder.encode(HttpObjectEncoder.java:162)
	at io.netty.handler.codec.http.HttpObjectEncoder.write(HttpObjectEncoder.java:97)
	... 22 more

Meanwhile the server just spits this large pile of Netty internals. The big issue here is the lack of information about the throwable that failed the response stream (no 'boom' string seen anywhere in the logs)

Additional information

Expected behavior

The expected behavior in this case is that the server:

  • sends all elements from the response stream prior the failure
  • drops the connection right after it encounters the exception in the stream
  • logs the details on the exception that caused the stream to fail

Working example

As a side note, this example shows that such behavior did exist in tapir v1.2.3

// ThisBuild / scalaVersion := "3.3.1"

// libraryDependencies ++= Seq(
//   "com.softwaremill.sttp.tapir" %% "tapir-zio-http-server" % "1.2.3"
// )

package tapirsample

import sttp.capabilities.zio.ZioStreams
import sttp.tapir.CodecFormat
import sttp.tapir.Schema
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import sttp.tapir.ztapir.*
import zio.*
import zio.http.HttpApp
import zio.http.Server
import zio.http.ServerConfig
import zio.stream.ZStream

import java.nio.charset.StandardCharsets

object Sample extends ZIOAppDefault {

  val badEndpoint: ZServerEndpoint[Any, ZioStreams] = endpoint
    .in("test")
    .get
    .out(
      streamBody(ZioStreams)(
        summon[Schema[Chunk[String]]],
        CodecFormat.TextPlain(),
        Some(StandardCharsets.UTF_8)
      )
    )
    .zServerLogic { _ =>
      val stream = ZStream("foo", "bar") ++ ZStream.fail(new RuntimeException("boom"))
      val byteStream = stream.mapConcat(_.getBytes)
      ZIO.succeed(byteStream)
    }

  val routes: HttpApp[Any, Throwable] =
    ZioHttpInterpreter().toHttp(badEndpoint)

  override def run: ZIO[Any, Any, Any] =
    Server.serve(routes).provide(Server.live, ZLayer.succeed(ServerConfig.default.port(9000)))

}
sbt run
[info] welcome to sbt 1.8.2 (Eclipse Adoptium Java 17.0.6)
[info] loading settings for project tapir-stream-issue-build-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project/project
[info] loading settings for project tapir-stream-issue-build from metals.sbt ...
[info] loading project definition from /Users/mskripnik/projects/tapir-stream-issue/project
[success] Generated .bloop/tapir-stream-issue-build.json
[success] Total time: 1 s, completed 28 Feb 2024, 11:40:27
[info] loading settings for project tapir-stream-issue from build.sbt ...
[info] set current project to tapir-stream-issue (in build file:/Users/mskripnik/projects/tapir-stream-issue/)
[info] compiling 1 Scala source to /Users/mskripnik/projects/tapir-stream-issue/target/scala-3.3.1/classes ...
[info] running tapirsample.Sample
curl -v 'http://localhost:9000/test'
*   Trying [::1]:9000...
* Connected to localhost (::1) port 9000
> GET /test HTTP/1.1
> Host: localhost:9000
> User-Agent: curl/8.4.0
> Accept: */*
>
< HTTP/1.1 200 OK
< Content-Type: text/plain; charset=UTF-8
< transfer-encoding: chunked
<
* transfer closed with outstanding read data remaining
* Closing connection
curl: (18) transfer closed with outstanding read data remaining
foobar%
  • foobar is seen in the response
  • the connection is closed so curl ends the process with error details
[info] running tapirsample.Sample
[ERROR] KQueueEventLoopGroup-2-2 NettyRuntime HttpRuntimeException:Exception in thread "zio-fiber-" java.lang.RuntimeException: java.lang.RuntimeException: boom
Feb 28, 2024 11:40:36 AM io.netty.channel.DefaultChannelPipeline onUnhandledInboundException
WARNING: An exceptionCaught() event was fired, and it reached at the tail of the pipeline. It usually means the last handler in the pipeline did not handle the exception.
java.lang.RuntimeException: boom
	at tapirsample.Sample$.$anonfun$1$$anonfun$1(Sample.scala:29)
	at zio.ZIO$.fail$$anonfun$1(ZIO.scala:3083)
	at zio.ZIO$.failCause$$anonfun$1(ZIO.scala:3089)
	at zio.internal.FiberRuntime.runLoop(FiberRuntime.scala:1126)
	at zio.internal.FiberRuntime.evaluateEffect(FiberRuntime.scala:384)
	at zio.internal.FiberRuntime.start(FiberRuntime.scala:1380)
	at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:155)
	at zio.Runtime$UnsafeAPIV1.fork(Runtime.scala:138)
	at zio.http.netty.NettyRuntime.run(NettyRuntime.scala:48)
	at zio.http.netty.NettyRuntime.run$(NettyRuntime.scala:11)
	at zio.http.netty.NettyRuntime$$anon$4.run(NettyRuntime.scala:112)
	at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:200)
	at zio.http.netty.server.ServerInboundHandler.channelRead0(ServerInboundHandler.scala:32)
	at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200)
	at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)
	at io.netty.handler.codec.http.HttpServerKeepAliveHandler.channelRead(HttpServerKeepAliveHandler.java:64)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:336)
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:308)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.kqueue.AbstractKQueueStreamChannel$KQueueStreamUnsafe.readReady(AbstractKQueueStreamChannel.java:544)
	at io.netty.channel.kqueue.AbstractKQueueChannel$AbstractKQueueUnsafe.readReady(AbstractKQueueChannel.java:383)
	at io.netty.channel.kqueue.KQueueEventLoop.processReady(KQueueEventLoop.java:213)
	at io.netty.channel.kqueue.KQueueEventLoop.run(KQueueEventLoop.java:291)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:833)

The logs above contain the information about the root cause exception, even though there is also not a very useful warning from Netty

@maximskripnik
Copy link
Author

It seems like the issue is with zio-http, caused by this: zio/zio-http#2584

It's already fixed with zio/zio-http#2599, so I guess we just need to wait for the next zio-http release and update it in tapir

@kciesielski kciesielski added the zio label Mar 8, 2024
@TrustNoOne
Copy link

zio-http was released https://github.com/zio/zio-http/releases/tag/v3.0.0-RC5

@adamw
Copy link
Member

adamw commented Mar 19, 2024

@TrustNoOne yes, but it cointains snapshot transitive dependencies, so we cannot use it

@adamw
Copy link
Member

adamw commented Mar 19, 2024

See #3596

@dacr
Copy link

dacr commented Apr 25, 2024

I've just checked, zio/zio-http#2584 hang issues I had previously and coming from the ZIO side, are fixed (zio 2.0.22, tapir 1.10.5, zhttp-3.0.0-RC6), but now I have something wrong when tapir try to process a stream which returns an error.

I get 200 OK with no content :

< HTTP/1.1 200 OK
< Content-Type: application/json-seq
< transfer-encoding: chunked
< 
* transfer closed with outstanding read data remaining
* Closing connection 0

instead of getting (with the same inputs) :

< HTTP/1.1 400 Bad Request
< content-length: 305
< Content-Type: application/json
< 
* Connection #0 to host 127.0.0.1 left intact
{"querySyntaxError"...

I had two similar endpoints with the same entries, the first one returns a stream and the second one the result of a backend consumed stream (the same one).

If needed I will be able to write some snippets to reproduce the issue but not before 3 weeks unfortunately.

@adamw
Copy link
Member

adamw commented Apr 25, 2024

@dacr please do, I'm not aware of any similar issues

@dacr
Copy link

dacr commented May 15, 2024

Consider the stream logic failingGreetingStream3 bound to helloEndPoint, it summaries the issue I have, and once written like this it also gives me some hints on what's going on. I try to build a ZIO Stream from a java stream which can fail, and while reading this I realized the issue is coming from my side and the way I'm building the stream :(

  val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fromJavaStreamZIO(
      ZIO.fail(Exception("Can not build the stream"))
    )
  )

  val helloEndPoint =
    endpoint
      .description("Returns greeting")
      .get
      .in("hello")
      .out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
      .out(statusCode(StatusCode.Ok).description("query success"))
      .errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))

  val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)

The full script :

// ---------------------
//> using scala "3.4.1"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-zio-http-server:1.10.7"
//> using dep "com.softwaremill.sttp.tapir::tapir-json-zio:1.10.7"
// ---------------------

import sttp.tapir.ztapir.*
import sttp.tapir.server.ziohttp.ZioHttpInterpreter
import zio.*
import zio.stream.*
import zio.json.*
import zio.http.Server
import sttp.capabilities.zio.ZioStreams
import sttp.model.{MediaType, StatusCode}
import sttp.tapir.{CodecFormat, Schema}
import sttp.tapir.generic.auto.*
import sttp.tapir.json.zio.*
import sttp.tapir.ztapir.*

case class Greeting(message:String) derives JsonCodec

case class JsonSeqCodecFormat() extends CodecFormat {
  override val mediaType: MediaType = MediaType.unsafeApply("application", "json-seq")
}

object WebApp extends ZIOAppDefault {

  // --------------------------------------------------
  val greetingStream: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream
      .repeat(Greeting("Hello world"))
      .schedule(Schedule.spaced(1.second))
      .flatMap(greeting => ZStream.fromIterable( (greeting.toJson+"\n").getBytes ))
  )

  val failingGreetingStream1: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.fail(
    "Can not build the stream"
  )

  val failingGreetingStream2: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fail(Exception("Can not build the stream"))
  )

  val failingGreetingStream3: ZIO[Any, String, ZStream[Any, Throwable, Byte]] = ZIO.succeed(
    ZStream.fromJavaStreamZIO(
      ZIO.fail(Exception("Can not build the stream"))
    )
  )

  val helloEndPoint =
    endpoint
      .description("Returns greeting")
      .get
      .in("hello")
      .out(streamBody(ZioStreams)(Schema.derived[Greeting], JsonSeqCodecFormat()))
      .out(statusCode(StatusCode.Ok).description("query success"))
      .errorOutVariantPrepend(oneOfVariant(StatusCode.InternalServerError, plainBody[String]))


  //val helloRoute = helloEndPoint.zServerLogic[Any](_ => greetingStream)
  val helloRoute = helloEndPoint.zServerLogic[Any](_ => failingGreetingStream3)

  val routes = ZioHttpInterpreter().toHttp(List(helloRoute))
  override def run = Server.serve(routes).provide(Server.default)
}

WebApp.main(Array.empty)

@adamw
Copy link
Member

adamw commented May 17, 2024

@dacr does this work when using zio-http directly, without tapir?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

5 participants