Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Retire redundant Stream allocation and its materialization in ResponseLogger #6889

Open
wants to merge 2 commits into
base: main
Choose a base branch
from

Conversation

danicheg
Copy link
Member

@danicheg danicheg commented Jan 1, 2023

Since the whole response body is already materialized, there is no reason (besides the one described below) to allocate another Stream and materialize it further. That should reduce memory consumption vastly. But unfortunately, there is no free lunch. Although memory consumption is drying down noteworthy, building a String from Stream is 30-40% faster for bodies 100Kb and up than from ByteVector. I'm not quite following the reasons, but things are so.

Benchmark                                                                      (size)  Mode  Cnt        Score       Error   Units
StreamBench.BodyLoggingRespectingEntityModel                                     1000  avgt    5        0.119 ±     0.014   ms/op
StreamBench.BodyLoggingRespectingEntityModel:·gc.alloc.rate.norm                 1000  avgt    5   240551.126 ±   835.507    B/op
StreamBench.BodyLoggingRespectingEntityModel                                    10000  avgt    5        0.203 ±     0.046   ms/op
StreamBench.BodyLoggingRespectingEntityModel:·gc.alloc.rate.norm                10000  avgt    5   288581.383 ±  1675.334    B/op
StreamBench.BodyLoggingRespectingEntityModel                                   100000  avgt    5        1.195 ±     0.009   ms/op
StreamBench.BodyLoggingRespectingEntityModel:·gc.alloc.rate.norm               100000  avgt    5   717638.671 ±   331.169    B/op
StreamBench.CurrentBodyLogging                                                   1000  avgt    5        0.122 ±     0.012   ms/op
StreamBench.CurrentBodyLogging:·gc.alloc.rate.norm                               1000  avgt    5   260361.355 ±   190.546    B/op
StreamBench.CurrentBodyLogging                                                  10000  avgt    5        0.144 ±     0.013   ms/op
StreamBench.CurrentBodyLogging:·gc.alloc.rate.norm                              10000  avgt    5   416624.951 ±  1455.973    B/op
StreamBench.CurrentBodyLogging                                                 100000  avgt    5        0.701 ±     0.101   ms/op
StreamBench.CurrentBodyLogging:·gc.alloc.rate.norm                             100000  avgt    5  1411222.382 ±   887.469    B/op

Maybe there is some inaccuracy in the benchmark. So whoever wants can dig into it.
TLDR is that reducing memory consumption comes with some degradation of throughput, so I'd like to get your thoughts.

@mergify mergify bot added the module:server label Jan 1, 2023
@diesalbla
Copy link
Contributor

diesalbla commented Jan 1, 2023

@danicheg Here is some possible intuition / explanation as to what may be a significant difference:

In both codes, the "observer" is pulling the bytes from the source stream, one chunk of bytes at a time. In the old code, the chunk is not discarded but added to the vector of chunks, so the "observer" does nothing more than inserting into the vector. On the finaliser, a new stream is built out of that vector, which emits the very same Chunk objects that were pulled from the source stream, and is processing them directly.

The new code converts each source chunk to a ByteVector (which it may already be) and concatenates those into a single large byte-vector. I am looking into the implementation of the ByteVector class, but if it consists of a resizable array, the new implementation may be having a growing number of memory allocations for intermediate results, and the time cost being both allocating them and garbage-collecting them. We may need to look at the ByteVector class, to see what would be the best way of working with so large byte-vectors.

@diesalbla
Copy link
Contributor

@danicheg What would be the performance if the code was to do the following:

  • Keep the vector of source chunks, and keep the observer as it is, inserting the chunks in the vector.
  • Perform the "compilation" of all chunks into a single ByteVector at the finaliser itself.
  • On that compilation, look at the total size of the entity and based on that decide whether to use a strict entity or a streamed one.

@danicheg
Copy link
Member Author

danicheg commented Jan 1, 2023

the new implementation may be having a growing number of memory allocations for intermediate results

Hmm, according to the benchmark, we get opposite results. The allocation rate is fewer by 10-100% with the new impl. Or do I misunderstand the results?

@danicheg
Copy link
Member Author

danicheg commented Jan 1, 2023

@diesalbla also appending to an immutable vector leads to a vector allocation, doesn't it?

@diesalbla
Copy link
Contributor

also appending to an immutable vector leads to a vector allocation, doesn't it?

Yeah, but only a part of it. IIRC internal implementation is a sort of tree, so some nodes are reused.

@armanbilge
Copy link
Member

Note that ByteVector works in exactly the same way as Chunk: it's a rope-like data structure with efficient concatenation without copying. That's the big reason it exists :)

@diesalbla
Copy link
Contributor

diesalbla commented Jan 1, 2023

Looking at the ByteVector implementation, one thing I can notice is that the sub-class Buffer does include an AtomicLong. Could it be that the updates to that "cell" add the extra time during processing? https://github.com/scodec/scodec-bits/blob/main/core/shared/src/main/scala/scodec/bits/ByteVector.scala#L2218

The implementation of ++ in bytevector ends up calling the bufferBy which creates one such Buffer instance.

@danicheg
Copy link
Member Author

danicheg commented Jan 2, 2023

I did some testing of Logger#defaultLogBody and got surprising results. The memory consumption of logging of the Strict body is way better than of Default (aka Streamed). I'd even say extremely better. But throughput is worse for ~40% for bodies 100+Kb.

Benchmark                                                     (size)  Mode  Cnt        Score       Error   Units
LoggerBench.LoggingOfStreamedBody                               1000  avgt    5        0.017 ±     0.002   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm           1000  avgt    5    39861.722 ±     4.766    B/op
LoggerBench.LoggingOfStreamedBody                              10000  avgt    5        0.037 ±     0.003   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm          10000  avgt    5   143985.389 ±     0.395    B/op
LoggerBench.LoggingOfStreamedBody                             100000  avgt    5        0.606 ±     0.005   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm         100000  avgt    5  1664486.729 ±     8.356    B/op
LoggerBench.LoggingOfStrictBody                                 1000  avgt    5        0.010 ±     0.001   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm             1000  avgt    5     7359.449 ±     0.077    B/op
LoggerBench.LoggingOfStrictBody                                10000  avgt    5        0.090 ±     0.002   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm            10000  avgt    5    59927.531 ±    44.047    B/op
LoggerBench.LoggingOfStrictBody                               100000  avgt    5        1.062 ±     0.004   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm           100000  avgt    5   585793.991 ±     4.921    B/op

Benchmark's code.
So, here are three rapid-fire thoughts.

  1. Results of the Logger#defaultLogBody's benchmark confirm my hypothesis that building a String from Stream is way faster than from ByteVector for payload 100+Kb.
  2. So we have to think about tweaking the throughput of the Logger#defaultLogBody for Strict bodies separately.
  3. Should we show this to the fs2/scodec teams?

@diesalbla
Copy link
Contributor

diesalbla commented Jan 2, 2023

@danicheg Looking at the benchmark code, is that the right use of the rechunkRandomlyWithSeed method of Stream? It seems that, if both values are equal, it is just going to emit chunks of same size.

  /** Rechunks the stream such that output chunks are within `[inputChunk.size * minFactor, inputChunk.size * maxFactor]`.
    * The pseudo random generator is deterministic based on the supplied seed.
    */
  def rechunkRandomlyWithSeed[F2[x] >: F[x]](minFactor: Double, maxFactor: Double)(

Edit a bit more code:

      def factor: Double = Math.abs(random.nextInt()) % (maxFactor - minFactor) + minFactor

      def go(acc: Chunk[O], sizeOpt: Option[Int], s: Stream[F2, Chunk[O]]): Pull[F2, O, Unit] = {
        def nextSize(chunk: Chunk[O]): Int =
          sizeOpt.getOrElse((factor * chunk.size).toInt)

So in essence, the two factor parameters are used to bound the "coefficient" on what the size of an output chunk will be in proportion to the size of a source chunk.

@danicheg
Copy link
Member Author

danicheg commented Jan 7, 2023

@diesalbla I assumed that it isn't matter. So, I've rerun the benchmark with rechunkRandomlyWithSeed(8, 32) and got the following

Benchmark                                                           (size)  Mode  Cnt        Score       Error   Units
LoggerBench.LoggingOfStreamedBody                                     1000  avgt    5        0.017 ±     0.001   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm                 1000  avgt    5    45131.353 ±     0.118    B/op
LoggerBench.LoggingOfStreamedBody                                    10000  avgt    5        0.049 ±     0.010   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm                10000  avgt    5   144197.190 ±     0.892    B/op
LoggerBench.LoggingOfStreamedBody                                   100000  avgt    5        0.588 ±     0.004   ms/op
LoggerBench.LoggingOfStreamedBody:·gc.alloc.rate.norm               100000  avgt    5  1665478.286 ±     8.014    B/op
LoggerBench.LoggingOfStrictBody                                       1000  avgt    5        0.010 ±     0.001   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm                   1000  avgt    5     7377.916 ±     0.043    B/op
LoggerBench.LoggingOfStrictBody                                      10000  avgt    5        0.094 ±     0.005   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm                  10000  avgt    5    59984.104 ±     4.286    B/op
LoggerBench.LoggingOfStrictBody                                     100000  avgt    5        1.108 ±     0.067   ms/op
LoggerBench.LoggingOfStrictBody:·gc.alloc.rate.norm                 100000  avgt    5   586146.120 ±     6.857    B/op

so discarding the fluctuation, we have the same results. And I'm still sure that playing with arguments of that function could change something in the benchmark's numbers.

@diesalbla
Copy link
Contributor

so discarding the fluctuation, we have the same results. And I'm still sure that playing with arguments of that function could change something in the benchmark's numbers.

So, in the benchmarks, you have the following code:

      fs2.Stream
        .emits(rawData)
        .covary[IO]

Where rawData is an Array[Byte], so the emits function is going to build a source array in which the entire array is one single chunk. If you want, say, for your benchmark to split that source array into chunks of, say, between a twentieth and a tenth the size of that chunk, then you would need to call rechunkRandomlyWithSeed(0.05d, 0.1d). That is what I meant by coefficients of the size.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants