Skip to content

Exception not propagated on parallel execution with blocking zip function #5580

@marcosalis

Description

@marcosalis

When using the zip operator on an Observable or Single along with a blocking call, there apparently isn't a way to propagate a single error consistently since mergeArrayDelayError() or similar all accept a single data type.

In this example, I have two Single, executed in parallel on a different thread, which are then passed into a zip function and then synchronously executed through a blockingGet().
What I'd expect is that the exception is thrown by this latter method. What happens though is that the exception is passed to the default uncaught exception handler from RxJavaPlugins.onError().

Am I using the wrong approach or is there an issue with error propagation here?
RxJava 2.1.3 on Android.

Here's the (failing) JUnit test:

    @Test(expected = InterruptedException.class)
    public void testParallelZip_exceptionNotPropagated() throws Exception {
        Single<Object> allPeopleSource = Single.fromCallable(() -> {
            throw new InterruptedException();
        }).subscribeOn(Schedulers.io());
        Single<String> idsSource = Single.fromCallable(() -> "second_single").subscribeOn(Schedulers.io());

        Single.zip(allPeopleSource, idsSource, (o, s) -> "result").blockingGet();
    }

And here's the failure crash log:

java.lang.Exception: Unexpected exception, expected<java.lang.InterruptedException> but was<java.lang.RuntimeException>

	at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:28)
	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:117)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:262)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:84)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: java.lang.RuntimeException: java.lang.InterruptedException
	at io.reactivex.internal.util.ExceptionHelper.wrapOrThrow(ExceptionHelper.java:45)
	at io.reactivex.internal.observers.BlockingMultiObserver.blockingGet(BlockingMultiObserver.java:91)
	at io.reactivex.Single.blockingGet(Single.java:2154)
	at [...].RxUtilsTest.testParallelZip_exceptionNotPropagated(RxUtilsTest.java:70)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.internal.runners.statements.ExpectException.evaluate(ExpectException.java:19)
	... 20 more
Caused by: java.lang.InterruptedException
	at com.teamwork.data.util.RxUtilsTest.lambda$testParallelZip_exceptionNotPropagated$1(RxUtilsTest.java:66)
	at io.reactivex.internal.operators.single.SingleFromCallable.subscribeActual(SingleFromCallable.java:35)
	at io.reactivex.Single.subscribe(Single.java:2700)
	at io.reactivex.internal.operators.single.SingleSubscribeOn$SubscribeOnObserver.run(SingleSubscribeOn.java:89)
	at io.reactivex.Scheduler$DisposeTask.run(Scheduler.java:452)
	at io.reactivex.internal.schedulers.ScheduledRunnable.run(ScheduledRunnable.java:61)
	at io.reactivex.internal.schedulers.ScheduledRunnable.call(ScheduledRunnable.java:52)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions