When using the zip operator on an Observable or Single along with a blocking call, there apparently isn't a way to propagate a single error consistently since mergeArrayDelayError() or similar all accept a single data type.
In this example, I have two Single, executed in parallel on a different thread, which are then passed into a zip function and then synchronously executed through a blockingGet().
What I'd expect is that the exception is thrown by this latter method. What happens though is that the exception is passed to the default uncaught exception handler from RxJavaPlugins.onError().
Am I using the wrong approach or is there an issue with error propagation here?
RxJava 2.1.3 on Android.
Here's the (failing) JUnit test:
@Test(expected = InterruptedException.class)
public void testParallelZip_exceptionNotPropagated() throws Exception {
Single<Object> allPeopleSource = Single.fromCallable(() -> {
throw new InterruptedException();
}).subscribeOn(Schedulers.io());
Single<String> idsSource = Single.fromCallable(() -> "second_single").subscribeOn(Schedulers.io());
Single.zip(allPeopleSource, idsSource, (o, s) -> "result").blockingGet();
}
And here's the failure crash log:
java.lang.Exception: Unexpected exception, expected<java.lang.InterruptedException> but was<java.lang.RuntimeException>
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:28)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:91)
at io.reactivex.Single.blockingGet(Single.java:2154)
at [...].RxUtilsTest.testParallelZip_exceptionNotPropagated(RxUtilsTest.java:70)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
... 20 more
Caused by: java.lang.InterruptedException
at com.teamwork.data.util.RxUtilsTest.lambda$testParallelZip_exceptionNotPropagated$1(RxUtilsTest.java:66)
at io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:35)
at io.reactivex.Single.subscribe(Single.java:2700)
at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
When using the zip operator on an
ObservableorSinglealong with a blocking call, there apparently isn't a way to propagate a single error consistently sincemergeArrayDelayError()or similar all accept a single data type.In this example, I have two
Single, executed in parallel on a different thread, which are then passed into azipfunction and then synchronously executed through ablockingGet().What I'd expect is that the exception is thrown by this latter method. What happens though is that the exception is passed to the default uncaught exception handler from
RxJavaPlugins.onError().Am I using the wrong approach or is there an issue with error propagation here?
RxJava 2.1.3 on Android.
Here's the (failing) JUnit test:
And here's the failure crash log: