Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flowable#groupBy race leads to a back-pressure issue #7100

Open
bsideup opened this issue Oct 20, 2020 · 4 comments
Open

Flowable#groupBy race leads to a back-pressure issue #7100

bsideup opened this issue Oct 20, 2020 · 4 comments

Comments

@bsideup
Copy link

bsideup commented Oct 20, 2020

Hi!

While debugging reactor/reactor-core#2352 we wanted to check whether RxJava has the same issue since, given the history of both projects :)

Apparently, with 3.0.7, the same construction in RxJava fails with a very similar issue (although the failure is different):

final int total = 100;

Long count = Flowable.range(0, total)
                     .groupBy(i -> (i / 2) * 2)
                     .flatMapMaybe(Flowable::firstElement, false, 1)
                     .observeOn(Schedulers.io())
                     .count()
                     .blockingGet();
assertThat(total - count).as("count").isZero();

Gives (not 100% reliably, consider running in "rerun until failure" mode):

io.reactivex.rxjava3.exceptions.MissingBackpressureException: Unable to emit a new group (#97) due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.

	at io.reactivex.rxjava3.internal.operators.flowable.FlowableGroupBy$GroupBySubscriber.onNext(FlowableGroupBy.java:197)

A few interesting observations:

  1. Changing observeOn's buffer size to 131 and higher makes it always pass
  2. 130 would sometimes fail with Unable to emit a new group (#99) due to lack of requests
  3. 129 would sometimes fail with Unable to emit a new group (#98) due to lack of requests
  4. 128 would sometimes fail with Unable to emit a new group (#97) due to lack of requests
  5. etc etc

So it looks like there is a race between cancellation of the group and starting a new one, although I haven't investigated RxJava's issue much.

@akarnokd
Copy link
Member

There is no ordering between group emission and cancellation and the JavaDocs warns about MBEs due to lack of requesting more groups.
http://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html#groupBy-io.reactivex.rxjava3.functions.Function-

Backpressure:
The consumer of the returned Flowable has to be ready to receive new GroupedFlowables or else this operator will signal MissingBackpressureException. To avoid this exception, make sure a combining operator (such as flatMap) has adequate amount of buffering/prefetch configured.

In the example, the observeOn introduces just enough asynchrony via its requesting that the range ends up creating more groups for which flatMapMaybe is not ready.

@akarnokd
Copy link
Member

It's more complicated than that.

The request/delivery in the inner groups still feed back to the main source because they have to trigger progress if they become ready. Consequently, since the next upstream value may be creating a new group or would go to an existing group, the main group output may hold back the inner groups. You could buffer these new groups but then they can't get consumed and can clog up the main queue.

@OlegDokuka
Copy link

Yeah. Noticed that. I guess I found another fix. Let me try it quickly for the Reactor to see if it works. Apologies for confusing with the first message

@OlegDokuka
Copy link

Fix -> reactor/reactor-core#2450

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants