In init method ObserveOnSubscriber::requested is updated but Subscriber::requested is not.
I think it should be something like this:
localChild.setProducer(new Producer() {
@Override
public void request(long n) {
if (n > 0L) {
BackpressureUtils.getAndAddRequest(requested, n);
ObserveOnSubscriber.this.request(requested.get());
schedule();
}
}
});
Otherwise, subscribers that wrap ObserveOnSubscriber (that wraps subscriber with requested value NOT_SET) gets bufferSize as requested value.
In init method
ObserveOnSubscriber::requestedis updated butSubscriber::requestedis not.I think it should be something like this:
Otherwise, subscribers that wrap
ObserveOnSubscriber(that wraps subscriber withrequestedvalue NOT_SET) gets bufferSize asrequestedvalue.