Skip to content

Commit

Permalink
ZPipeline.fromFunction benchmark and optimization (#8761)
Browse files Browse the repository at this point in the history
* fromFunction_benchmark: introduce a benchmark

* fromFunction_benchmark: ZPipeline.fromFunction2

* fromFunction_benchmark: add benchmarks for fromFunction2

* fromFunction_benchmark: slight optimization

* fromFunction_benchmark: finalize benchmarks

* fromFunction_benchmark: fmt

* fromFunction_benchmark: replace the impl, rename the benchmarks accordingly

* fromFunction_benchmark: fix test compilation issue

* fromFunction_benchmark: fix tests compilation issue

* fromFunction_benchmark: drop the orig fromFunction

* Update streams-tests/shared/src/test/scala/zio/stream/ZPipelineSpec.scala

---------

Co-authored-by: Pierre Ricadat <ghostdogpr@users.noreply.github.com>
  • Loading branch information
eyalfa and ghostdogpr committed Apr 19, 2024
1 parent 16e7249 commit f3ad39d
Show file tree
Hide file tree
Showing 5 changed files with 113 additions and 11 deletions.
37 changes: 37 additions & 0 deletions benchmarks/src/main/scala/zio/StreamBenchmarks.scala
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,43 @@ class StreamBenchmarks {
unsafeRun(result)
}

val chunkToConst: Chunk[Int] => Int = _ => 1
val strmChunkToStrmConsts: ZStream[Any, Nothing, Chunk[Int]] => ZStream[Any, Nothing, Int] = _.map(chunkToConst)

@Benchmark
def zioChunkToConstDirect: Long = {
val result =
strmChunkToStrmConsts(
ZStream
.fromChunks(zioChunks: _*)
.chunks
).runCount

unsafeRun(result)
}

@Benchmark
def zioChunkToConstBaseline: Long = {
val result = ZStream
.fromChunks(zioChunks: _*)
.chunks
.via(ZPipeline.map(chunkToConst))
.runCount

unsafeRun(result)
}

@Benchmark
def zioChunkToConstFromFunction: Long = {
val result = ZStream
.fromChunks(zioChunks: _*)
.chunks
.via(ZPipeline.fromFunction(strmChunkToStrmConsts))
.runCount

unsafeRun(result)
}

}

@State(JScope.Thread)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,18 @@ object ZPipelineSpec extends ZIOBaseSpec {
.exit
} yield assert(result)(fails(equalTo(EncodingException("Not a valid hex digit: 'g'"))))
}
)
),
test("fromFunction") {
ZStream
.range(0, 20, 5)
.via {
ZPipeline.fromFunction { (strm: ZStream[Any, Any, Int]) =>
strm.map(_ + 1)
}
}
.runCollect
.map(assert(_)(equalTo(Chunk.range(1, 21))))
}
)

val weirdStringGenForSplitLines: Gen[Any, Chunk[String]] = Gen
Expand Down
12 changes: 12 additions & 0 deletions streams/shared/src/main/scala/zio/stream/ZChannel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,18 @@ object ZChannel {
channel: ZChannel[Env, Any, Any, Any, OutErr, OutElem, OutDone]
) extends ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]

private[zio] final case class DeferedUpstream[-Env, -InErr, -InElem, -InDone, +OutErr, +OutElem, +OutDone](
mkChannel: ZChannel[Any, Any, Any, Any, InErr, InElem, InDone] => ZChannel[
Env,
Any,
Any,
Any,
OutErr,
OutElem,
OutDone
]
) extends ZChannel[Env, InErr, InElem, InDone, OutErr, OutElem, OutDone]

private[zio] final case class BracketOut[R, E, Z](
acquire: () => ZIO[R, E, Z],
finalizer: (Z, Exit[Any, Any]) => URIO[R, Any]
Expand Down
19 changes: 9 additions & 10 deletions streams/shared/src/main/scala/zio/stream/ZPipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1443,16 +1443,15 @@ object ZPipeline extends ZPipelinePlatformSpecificConstructors {
def fromFunction[Env, Err, In, Out](
f: ZStream[Any, Nothing, In] => ZStream[Env, Err, Out]
)(implicit trace: Trace): ZPipeline[Env, Err, In, Out] = {

val channel: ZChannel[Env, ZNothing, Chunk[In], Any, Err, Chunk[Out], Any] =
ZChannel.unwrap {
for {
input <- SingleProducerAsyncInput.make[ZNothing, Chunk[In], Any]
stream = ZStream.fromChannel(ZChannel.fromInput(input))
} yield f(stream).channel.embedInput(input)
}

new ZPipeline(channel)
def fc(
upstream: ZChannel[Any, Any, Any, Any, ZNothing, Chunk[In], Any]
): ZChannel[Env, Any, Any, Any, Err, Chunk[Out], Any] =
f(upstream.toStream).toChannel

val resCh: ZChannel.DeferedUpstream[Env, ZNothing, Chunk[In], Any, Err, Chunk[Out], Any] =
ZChannel.DeferedUpstream(fc)
val resPl: ZPipeline[Env, Err, In, Out] = resCh.toPipeline
resPl
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,19 @@ private[zio] class ChannelExecutor[Env, InErr, InElem, InDone, OutErr, OutElem,
)
}

case ZChannel.DeferedUpstream(mkChannel) =>
val inpAsChannel: ZChannel[Env, Any, Any, Any, Any, Any, Any] = execToPullingChannel(input)
val nextChannel = mkChannel(inpAsChannel.asInstanceOf[ZChannel[Any, Any, Any, Any, Any, Any, Any]])

val previousInput = input
input = null
addFinalizer { exit =>
val effect = restorePipe(exit, previousInput)

if (effect ne null) effect
else ZIO.unit
}
currentChannel = nextChannel
case ZChannel.PipeTo(left, right) =>
val previousInput = input

Expand Down Expand Up @@ -744,6 +757,36 @@ private[zio] object ChannelExecutor {

read()
}

private[zio] def execToPullingChannel[Env](
exec: ErasedExecutor[Env]
)(implicit trace: Trace): ZChannel[Env, Any, Any, Any, Any, Any, Any] = {
def ch2(st: ChannelState[Env, Any]): ZChannel[Env, Any, Any, Any, Any, Any, Any] =
st match {
case ChannelState.Done =>
exec.getDone match {
case Exit.Success(res) =>
ZChannel.succeedNow(res)
case Exit.Failure(c) =>
ZChannel.refailCause(c)
}
case ChannelState.Emit =>
ZChannel.write(exec.getEmit) *>
ch2(exec.run())
case ChannelState.Effect(zio) =>
ZChannel.fromZIO(zio) *> ch2(exec.run())
case r @ ChannelState.Read(upstream, onEffect, onEmit, onDone) =>
ZChannel.fromZIO {
ChannelExecutor.readUpstream[Env, Any, Any, Any](
r.asInstanceOf[ChannelState.Read[Env, Any]],
() => ZIO.unit,
ZIO.refailCause
)
} *> ch2(exec.run())
}
ZChannel.suspend(ch2(exec.run()))
}

}

/**
Expand Down

0 comments on commit f3ad39d

Please sign in to comment.