Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BufferTimeout with fair backpressure rework #3634

Open
wants to merge 12 commits into
base: 3.5.x
Choose a base branch
from

Conversation

chemicL
Copy link
Member

@chemicL chemicL commented Nov 7, 2023

The fair backpressure variant of the bufferTimeout operator has been reworked to use a state machine with a minimum number of volatile variables eliminating potential data races, such as skipping the delivery when onNext and timeout happen concurrently or cancellation happens while onNext is delivered, etc.

Resolves #3531

@chemicL chemicL added the type/bug A general bug label Nov 7, 2023
@chemicL chemicL added this to the 3.5.12 milestone Nov 7, 2023
@chemicL chemicL self-assigned this Nov 7, 2023
@chemicL chemicL requested a review from a team as a code owner November 7, 2023 12:57
Copy link
Contributor

@OlegDokuka OlegDokuka left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue sits deeper and as I suggested first requires complete rework.

To reproduce the issue try to modify discard setup as follows:

@Tag("slow")
public class OnDiscardShouldNotLeakTest {

	private static final int NB_ITERATIONS = 100_000;
	// add DiscardScenarios here to test more operators
	private static final DiscardScenario[] SCENARIOS = new DiscardScenario[] {
			DiscardScenario.fluxSource("bufferTimeout", f -> f.bufferTimeout(2, Duration.ofNanos(1), true).flatMapIterable(Function.identity())),
	};
       ...

the problem appears when multiple threads comes into play. If you have race between scheduled timeout task trying to flash the window and cancellation ,then some elements might be undischarged

@OlegDokuka
Copy link
Contributor

Although the above could be false negative, since the async task can be unawaited properly. Can you please doublecheck @chemicL

@OlegDokuka
Copy link
Contributor

I just remembered that it was one of the reasons why all the discard tests are in stress tests, therefore it could make sense to port part of them for bufferTimeout

@violetagg violetagg modified the milestones: 3.5.12, 3.5.13 Nov 14, 2023
@chemicL
Copy link
Member Author

chemicL commented Nov 22, 2023

@OlegDokuka the commit 0334959 adds significant improvements to the test suite that helps catch the racy situations. In the next commit I will add temporary fixes for the identified issues, but afterwards will follow up with a state machine implementation to eliminate the last one with timeout racing with draining.

@chemicL chemicL marked this pull request as draft November 22, 2023 13:07
@chemicL chemicL modified the milestones: 3.5.16, 3.5.17 Apr 9, 2024
@chemicL chemicL changed the title Handling late arriving drain operations in bufferTimeout BufferTimeout with fair backpressure rework Apr 29, 2024
@chemicL
Copy link
Member Author

chemicL commented May 22, 2024

For the latest changes, I added a JMH benchmark. The idea is to simply test a non-contended, single threaded case of pulling 100 items and packaging them in 1, 10, and 100-item buffers.

Devising a contended usage scenario with a concurrent, asymmetric JMH benchmark doesn't seem worthwhile as the dominating factor should never be two actors competing but rather one actor winning a potential race and performing the bulk of the work. We can't approach this performance evaluation the same way as a regular queue benchmark with a randezvous scenario of handing over between two threads. In the Queue benchmark case, this operation is the essence of the work being done. In contrast, with reactive-streams it is not the case as there is the idea of work stealing where one actor winning a race (which can contend in a lock-free manner for a short moment) is performing a drain operation and the Queue is a means to an end and we are not evaluating its behaviour under load, but the operator's algorithm itself.

Having said the above, I compared the new backpressured variant with the current (broken) implementation and also with the simpler, yet not-backpressured variant. Here are my results on an M1 MacBook Pro (10-core, 32GB):

This PR, fairBackpressure = true

Benchmark                             (bufferSize)  Mode  Cnt     Score     Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  6259.514 ± 214.272  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5  4418.668 ± 595.467  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5  3938.607 ± 518.551  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  5764.457 ± 399.358  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5  4072.673 ± 169.380  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5  3797.930 ±  94.629  ns/op

Current 3.5.x, fairBackpressure = true

Benchmark                             (bufferSize)  Mode  Cnt      Score      Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  23836.157 ± 2138.635  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5   8935.440 ±   43.610  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5   7182.099 ±  129.899  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  18836.570 ± 1281.322  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5   8231.299 ±   55.848  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5   7197.949 ±  160.759  ns/op

Current 3.5.x, fairBackpressure = false

Benchmark                             (bufferSize)  Mode  Cnt      Score      Error  Units
FluxBufferTimeoutBenchmark.oneByOne              1  avgt    5  18376.986 ±  795.211  ns/op
FluxBufferTimeoutBenchmark.oneByOne             10  avgt    5   3458.388 ±  200.223  ns/op
FluxBufferTimeoutBenchmark.oneByOne            100  avgt    5   2533.851 ±  440.186  ns/op
FluxBufferTimeoutBenchmark.unlimited             1  avgt    5  15716.505 ± 1168.297  ns/op
FluxBufferTimeoutBenchmark.unlimited            10  avgt    5   3040.167 ±  144.305  ns/op
FluxBufferTimeoutBenchmark.unlimited           100  avgt    5   2470.696 ±  375.516  ns/op

To summarize the above, the new implementation passes the JCStress tests devised to catch the discovered issues while maintaining a better performance characteristic. The non-prefetching and incapable of respecting backpressure variant is more performant and that's understood as it doesn't need to keep track of much accounting and is a simpler implementation.

@chemicL chemicL marked this pull request as ready for review May 22, 2024 15:13
@@ -49,7 +49,7 @@ void log(String instance,
formatState(committedState, 64)), new RuntimeException());
}
else {
this.logger.trace(String.format("[%s][%s][%s][%s-%s]",
this.logger.trace(String.format("[%s][%s][%s][\n\t%s\n\t%s]",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this change it's much easier to compare the state changes as they are stacked on top of each other so the differences are immediately visible as opposed to a horizontal presentation.

@@ -365,7 +367,7 @@ public void requestedFromUpstreamShouldNotExceedDownstreamDemand() {
.assertNext(s -> assertThat(s).containsExactly("a"))
.then(() -> assertThat(requestedOutstanding).hasValue(19))
.thenRequest(1)
.then(() -> assertThat(requestedOutstanding).hasValue(20))
.then(() -> assertThat(requestedOutstanding).hasValue(19))
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a behaviour change, yet it is not dramatic or disruptive. Before, a request would force requesting whatever is remaining from the upstream to fulfill the prefetch capacity. Now it will do that only once the replenishMark is reached.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug A general bug
Projects
None yet
3 participants