Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.x: recursive concat causes StackOverflowError #6958

Open
akarnokd opened this issue Apr 11, 2020 · 5 comments
Open

3.x: recursive concat causes StackOverflowError #6958

akarnokd opened this issue Apr 11, 2020 · 5 comments

Comments

@akarnokd
Copy link
Member

Originally posted on StackOverflow.

The following code crashes with StackOverflowError and the stacktrace shows a long chain of request calls.

import io.reactivex.rxjava3.core.Flowable
import io.reactivex.rxjava3.core.Single
import io.reactivex.rxjava3.schedulers.Schedulers
import java.util.concurrent.TimeUnit.SECONDS

fun main() {
    fun incr(n: Int): Single<Int> = Single.just(n + 1)

    fun numbers(n: Int, max: Int): Flowable<Int> = Flowable.just(n).concatWith(
        if (n < max)
            incr(n)
            .observeOn(Schedulers.single())
            .toFlowable()
            .concatMap { next -> numbers(next, max) }
        else
            Flowable.empty()
    )

    numbers(1, 10_000)
    .blockingForEach(::println)
}
Exception in thread "main" java.lang.StackOverflowError
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.subscriptions.SubscriptionArbiter.request(SubscriptionArbiter.java:135)
	at io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap$ConcatMapImmediate.request(FlowableConcatMap.java:215)

I'm not sure why there is such a chain created and if this is a result of an RxJava bug or not.

@tmankita
Copy link

tmankita commented Jul 8, 2020

Hi,
I would like to pick up this issue if no one is working on it yet.

@akarnokd
Copy link
Member Author

akarnokd commented Jul 8, 2020

Sure.

@tmankita
Copy link

tmankita commented Jul 8, 2020

I want to share my theory:
"ConcatWith" function needs to get @nonnull Publisher<@nonnull ? extends T> as a parameter but in the example above it gets if-else expression instead. In my opinion, the reason for the StackOverflow Error is that "ConcatWith" take somehow only the following expression :
incr(n) .observeOn(Schedulers.single()).toFlowable().concatMap { next -> numbers(next, max)}
without the base case of the recursion.

one of the comments In the original post on stackOverFlow suggest alternative working implementation:

fun <T> unfold(seed: T, next: (T) -> T?): Flowable<T> =
        UnicastProcessor.create<T>().toSerialized().let { proc ->
            proc
                .startWithItem(seed)
                .doOnNext { prev ->
                    when (val curr = next(prev)) {
                        null ->
                            proc.onComplete()
                        else ->
                            proc.onNext(curr)
                    }
                }
        }

    fun numbers(first: Int, max: Int): Flowable<Int> =
        unfold(first) { prev -> if (prev < max) prev + 1 else null }

    numbers(1, 1_000_000_000)
        .sample(1, SECONDS)
        .blockingForEach(::println)

As we can see the unfold function takes as an argument a function that contains the if-else expression, and this is the right way to do so because the unfold return a flowable object that contains the base case of the recursion.

@akarnokd , What do you think about my theory?

@akarnokd
Copy link
Member Author

akarnokd commented Jul 9, 2020

It creates a long chain because of concatMap is kept alive even though it ever emits one item due to incr. I still have to think about what we can do about it or even if we should.

@anastr
Copy link

anastr commented Aug 12, 2020

it looks like each time you add new couple of concatWith-concatMap operators, so while the sequence grow up it must pass the emit to the whole concatWith-concatMap couples then to the Consumer.
My only proof for that is that the last '1000' emits will take longer time than first one.

I'm sorry to bother, but i have a question:
in this case, why the error didn't deliver to the consumer (onError has been implemented for sure)!
if the max number was 1000, error will be thrown after complete!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants