Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chaining rxSingle calls that use Unconfined dispatcher and blockingGet results in deadlock #3458

Open
byencho opened this issue Sep 22, 2022 · 7 comments
Labels

Comments

@byencho
Copy link

byencho commented Sep 22, 2022

Consider the follow setup in which two rxSingle calls are chained together and the second one uses blockingGet():

fun main() {
    val dispatcher1 = Dispatchers.Unconfined
    val dispatcher2 = Dispatchers.Unconfined

    val disposable = rxSingle(dispatcher1) { }
        .map {
            rxSingle(dispatcher2) { }
                .blockingGet() // <----- This code deadlocks here
        }
        .timeout(100, TimeUnit.MILLISECONDS)
        .subscribe(
            { println("Success") },
            { println("Error : $it") }
        )

    while (!disposable.isDisposed) {
        // Wait for request to end
    }
}

(Set aside that blockingGet() is not a good practical choice here...this is just to highlight the problem.)

The above triggers the timeout because the blockingGet() call is deadlocked:

Error : java.util.concurrent.TimeoutException: The source did not signal an event for 100 milliseconds and has been terminated.

This happens even if the suspending code inside rxSingle operates with a different dispatcher:

fun main() {
    val dispatcher1 = Dispatchers.Unconfined
    val dispatcher2 = Dispatchers.Unconfined

    val disposable = rxSingle(dispatcher1) {
        withContext(Dispatchers.IO) {}
    }
        .map {
            rxSingle(dispatcher2) {
                withContext(Dispatchers.IO) {}
            }
                .blockingGet()
        }
        .timeout(100, TimeUnit.MILLISECONDS)
        .subscribe(
            { println("Success") },
            { println("Error : $it") }
        )

    while (!disposable.isDisposed) {
        // Wait for request to end
    }
}

The code completes successfully and prints "Success" if either dispatcher1 or dispatcher2 is set to Dispatchers.Default / Dispatchers.IO etc. so it appears that the use of Unconfined on both is key to the issue.

My main question is: is this expected? If so, why does it require both requests to use Unconfined to see the issue? Is this because of the "event-loop" that gets involved here for nested coroutines:

Nested coroutines launched in this dispatcher form an event-loop to avoid stack overflows.

My understanding is that this was actually introduced in #860 to avoid deadlocks when using nested runBlocking calls. Is it possible the conversion from suspending functions to Single / Completable / etc. unintentionally allows this problem to occur again when using Single.blockingGet()?


For a little more context, the practical consideration here is that we have a networking layer that exposes suspend functions and then an RxJava wrapper around that network layer for callers who prefer to use RxJava. The calls are set up like the following:

fun rxJavaNetworkCall(...) = rxSingle(Dispatchers.Unconfined) {
    suspendingNetworkingCall(...)
}

The reasoning behind Dispatchers.Unconfined here is twofold:

  • Avoid unnecessary thread creation and thread hopping (since each suspendingNetworkCall already manages its own threading).
  • Simulate existing RxJava behavior (for better or worse) in which a change in thread is propagated downstream.

These ideas were similarly discussed in #2925 (comment) . Setting up our RxJava wrapper layer in this manner leaves us susceptible to the deadlocks discussed above, however, if any callers use blockingGet() in a network request chain for any reason. Is this just expected behavior and we should avoid using Dispatchers.Unconfined in this way? I see there has been some discussion about a hypothetical dispatcher that works like Unconfined and could be used in these kinds of cases : #2485 (comment)


Kotlin version : 1.5.21
coroutines-core / coroutines-rx2 version : 1.5.1
RxJava version : 2.2.19

@dkhalanskyjb
Copy link
Contributor

Explanation

The example can be simplified a lot:

rxSingle(Dispatchers.Unconfined) {
}.blockingGet()
println("the top half is OK")
runBlocking(Dispatchers.Unconfined) {
    rxSingle(Dispatchers.Unconfined) {
    }.blockingGet()
}

So, doing blockingGet on an Rx source from our library when emissions happen in Dispatchers.Unconfined will hang if the code is already executing in Dispatchers.Unconfined. You found the correct reason for this: in order to avoid stack overflows, there is special casing in the Dispatchers.Unconfined implementation that forces the code to just be put in a queue instead of being executed immediately if Dispatcher.Unconfined detects that its uses are being nested.

For example, this test will immediately pass, but without this implementation detail of Dispatchers.Unconfined, the code would be painfully slow or even crash with StackOverflowError:

    @Test
    fun nestedLaunches() {
        fun CoroutineScope.launchNested(remainingLevels: Int): Job? {
            return if (remainingLevels > 0) {
                launch {
                    launchNested(remainingLevels - 1)?.join()
                }
            } else {
                null
            }
        }
        runBlocking(Dispatchers.Unconfined) {
            launchNested(100000)?.join()
        }
    }

Now, here's how this causes the issue. The way blockingGet works is by requesting the subsequent elements and then suspending the thread, waiting for the elements to be produced. Typically, request-ing from rxSingle(Dispatchers.Unconfined) { ... } will immediately launch the computation in the block (in the same thread!), complete it, and only then return from the call to request, so blockingGet never suspends: by the time request is done, all the elements are there already.

On the other hand, when Dispatchers.Unconfined is nested, request will detect this and, to avoid deep nesting of calls, will just put the block in the queue to be executed. This queue will never get processed, however, since the thread will be hogged by blockingGet.

Workaround

Replacing Dispatchers.Unconfined with this one may better reflect what RxJava is doing:

/**
 * A dispatcher that executes the computations immediately in the thread that requests it.
 *
 * This dispatcher is similar to [Dispatchers.Unconfined] but does not attempt to avoid stack overflows.
 */
object DirectDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        block.run()
    }
}

I don't know if RxJava actually does handle nesting in its call chains in this manner, but initially, it does seem so.

@byencho
Copy link
Author

byencho commented Sep 26, 2022

@dkhalanskyjb Thanks for your response! I'm a little curious if you think using something like the DirectDispatcher you suggest would introduce the risk of the StackOverflowError that required fixing for Unconfined.

@dkhalanskyjb
Copy link
Contributor

Certainly, the risk is back. Replacing Unconfined with DirectDispatcher does cause a horrible slowdown in the test that I provided.

That said, I think this issue is inherent to RxJava: ReactiveX/RxJava#6958 does look like the kind of an issue that we're solving with the event loop, and https://stackoverflow.com/questions/42363452/rxjava-stackoverflowerror-exception-in-long-chain does, too.

The only question is, does rxSingle(DirectDispatcher) behave the same way RxJava's primitives typically do? I don't know the answer immediately, this needs careful analysis. But if so, yeah, DirectDispatcher may be problematic, but what can you do, the more you try to reproduce the behavior of RxJava, the more of its issues you'll inherit.

@byencho
Copy link
Author

byencho commented Sep 26, 2022

OK that makes sense. I will do some more testing and digging into what I'd expect for RxJava here and decide which of the following risks is the one I'm most willing to tolerate:

  • Possible stackoverflows when using something like the DirectDispatcher.
  • Possible deadlocks when using Dispatchers.Unconfined.
  • Potentially unnecessary thread hopping / creation / hogging when using Dispatchers.IO or Dispatchers.Default.

Thanks again for your help!

@byencho byencho closed this as completed Sep 26, 2022
@dkhalanskyjb
Copy link
Contributor

Please do share the results if you discover anything interesting!

@byencho
Copy link
Author

byencho commented Sep 26, 2022

Will do!

@byencho
Copy link
Author

byencho commented Sep 28, 2022

OK so here's what I found from some additional testing:

Summary

  • Using rxSingle(DirectDispatcher) { ... } does not seem to introduce any more risk of StackOverflowError than RxJava would already has on its own.
  • Even when using rxSingle(DirectDispatcher) { ... }, the risk of StackOverflowError is eliminated (or at least dramatically reduced) if the suspending call being wrapped already applies its own threading with a CoroutineDispatcher that behaves like Dispatchers.IO / Dispatchers.Default (which is similar to the standard RxJava behavior as well).

So overall I haven't really seen any practical downsides to using DirectDispatcher for this purpose and think it should be recommended when using rxSingle / rxCompletable / etc. because:

  • it eliminates the risk of deadlocks when calling blockingGet (when compared to using Dispatchers.Unconfined).
  • does not require any additional threads (when compared to using Dispatchers.IO).
  • produces RxJava types that behave the way you would expect when it comes to applying Schedulers (when compared to using Dispatchers.IO / Dispatchers.Default).

Details

For my tests of potential StackOverflowErrors I first wanted to check the baseline behavior of RxJava when dealing with a very long chain of calls.

RxJava only

Consider the following recursively constructed call chain:

val outerScheduler: Scheduler = ...
fun mockRequest(): Single<Unit> = ...

fun buildChain(
    current: Int,
    max: Int = 10_000
): Single<Unit> = mockRequest()
    .flatMap {
        if (current < max) {
            buildChain(current = current + 1)
        } else {
            mockRequest()
        }
    }

buildChain(current = 0)
    .subscribeOn(outerScheduler)
    .blockingGet()

The details of the behavior here depends in part on the implementation of outerScheduler and mockRequest.

When mockRequest contains no additional Scheduler applications such as

fun mockRequest(): Single<Unit> =
    Single.fromCallable { }

The chain above either results in an explicit StackOverflowError or simply hangs without completing around the same iteration the other tests would fail with the error. Schedulers.computation() / Schedulers.single() both hang, while all the other stand Schedulers result in the error. (I'm guessing the difference is likely related to bounded vs unbounded thread pools.) These results are similar to what was seen in ReactiveX/RxJava#6958 .

These issues go away, though, if mockRequest pre-applies threading using some Scheduler other than Schedulers.trampoline (which is effectively like no Scheduler at all and is therefore just like the original example):

fun mockRequest(): Single<Unit> =
    Single.fromCallable { }.subscribeOn(innerScheduler)

In this case, the call chain succeeds no matter what is used for outerScheduler.

Coroutine wrapper

Now we can compare this against the behavior of rxSingle(DirectDispatcher). When mockRequest is given as

fun mockRequest(): Single<Unit> =
    rxSingle(DirectDispatcher) {}

all tests fail with a StackOverflowError no matter what is used for outerScheduler. This is a slight change in the exact type of failure produced using different Schedulers but is generally consistent with the initial RxJava-only test.

When considering the case where the wrapped call itself uses its own Dispatcher:

fun mockRequest(): Single<Unit> =
    rxSingle(DirectDispatcher) {
        withContext(innerDispatcher) {}
    }

we get a similar result to the second RxJava-only test: when using an innerDispatcher like Dispatchers.IO / Dispatchers.Default, the tests complete without issue.

So in both types of cases here, rxSingle(DirectDispatcher) acts no worse than RxJava would on its own:

  • When the wrapped calls have no internal threading applied, long call chains have a risk of StackOverflowError, just like native RxJava calls would in this case.
  • When the wrapped calls have internal threading applied using something like Dispatchers.IO / Dispatchers.Default, the risk of StackOverflowError is either eliminated or greatly reduced, just like RxJava calls would be in this case.

Plus, when considering that the wrapped calls would have some CoroutineDispatcher associated with them (for example: Retrofit and Room calls) this means that in practice there simply is not a risk of StackOverflowError when using rxSingle(DirectDispatcher).

@dkhalanskyjb dkhalanskyjb reopened this Oct 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants