Effects version 2.0.8. The ordering of the observable sources in Observable#combineLatest impacts its resulting emissions. For example;
Observable<Integer> intObs = Observable.just(1, 2, 3, 4, 5);
Observable<String> stringObs = Observable.just("One");
Observable<String> stringObs2 = Observable.just("Two");
Observable.combineLatest(intObs, stringObs, stringObs2,
(i, s, s2) -> "Observable 1: " + i + " " + s + " " + s2)
.blockingIterable()
.forEach(System.out::println);
Observable.combineLatest(stringObs, stringObs2, intObs,
(s, s2, i) -> "Observable 2: " + i + " " + s + " " + s2)
.blockingIterable()
.forEach(System.out::println);
gives the following output;
Observable 1: 5 One Two
Observable 2: 1 One Two
Observable 2: 2 One Two
Observable 2: 3 One Two
Observable 2: 4 One Two
Observable 2: 5 One Two
I am uncertain whether this is the intended functionality or not as some of the unit tests seem to accept that this is the behaviour. Therefore this is either a bug or I believe that the documentation should better indicate that the order dictates from which observable source the latest item is taken from first.
In my opinion, the problem comes from the ObservableCombineLatest.LatestCoordinator#combine method. Here values are passed in from each of the observable sources. When a multiple item just observable (like above) is passed in first, combine() then skips each item until the last before there is a null signal from the completion of that observable source. The next just observable sources are then passed in and an item of each is then collected, then added to the queue and subsequently drained. When the order is switched, and the multiple item just is not first then an emission is added to the queue in combine() for every item as the upper bounding null is unknown.
I wrote the following breaking unit test to further explain; philleonard@acffb3b
I'm happy to implement the fix or documentation changes.
Effects version 2.0.8. The ordering of the observable sources in
Observable#combineLatestimpacts its resulting emissions. For example;gives the following output;
I am uncertain whether this is the intended functionality or not as some of the unit tests seem to accept that this is the behaviour. Therefore this is either a bug or I believe that the documentation should better indicate that the order dictates from which observable source the latest item is taken from first.
In my opinion, the problem comes from the
ObservableCombineLatest.LatestCoordinator#combinemethod. Here values are passed in from each of the observable sources. When a multiple item just observable (like above) is passed in first,combine()then skips each item until the last before there is a null signal from the completion of that observable source. The next just observable sources are then passed in and an item of each is then collected, then added to the queue and subsequently drained. When the order is switched, and the multiple item just is not first then an emission is added to the queue incombine()for every item as the upper bounding null is unknown.I wrote the following breaking unit test to further explain; philleonard@acffb3b
I'm happy to implement the fix or documentation changes.