Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flat map serializes output when using observable<>::create #575

Open
JavierBejMen opened this issue Mar 2, 2022 · 2 comments
Open

Flat map serializes output when using observable<>::create #575

JavierBejMen opened this issue Mar 2, 2022 · 2 comments

Comments

@JavierBejMen
Copy link

Hi all! We encountered this issue when using flat_map operator, as defined in documentation flat_map internally merges emisions of the CollectionSelector function. When using range in said function the output is as expected:

TEST(RxcppThreading, FlatMap){
    auto values = rxcpp::observable<>::range(1, 3)
        .flat_map(
            [](int v){
                return observable<>::range(1, 4);
            },
            [](int v_main, int v_sub){
                return std::make_tuple(v_main, v_sub);
            });

    values.subscribe(
            [](std::tuple<int, int> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
            [](){printf("OnCompleted\n");});
}

Output:

OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 2 - 1
OnNext: 1 - 3
OnNext: 2 - 2
OnNext: 3 - 1
OnNext: 1 - 4
OnNext: 2 - 3
OnNext: 3 - 2
OnNext: 2 - 4
OnNext: 3 - 3
OnNext: 3 - 4

But when using the create function all outputs are seralized for each source observable:

TEST(RxcppThreading, FlatMapCreate){
        auto values = rxcpp::observable<>::range(1, 3)
        .flat_map(
            [](int v){
                return observable<>::create<int>([](subscriber<int> s){
                    for (auto i = 0; i < 4; ++i){
                        if (!s.is_subscribed())
                            break;
                        s.on_next(i);
                    }
                    s.on_completed();
                });
            },
            [](int v_main, int v_sub){
                return std::make_tuple(v_main, v_sub);
            });

    values.subscribe(
            [](std::tuple<int, long> v){printf("OnNext: %d - %ld\n", std::get<0>(v), std::get<1>(v));},
            [](){printf("OnCompleted\n");});
}

Output:

OnNext: 1 - 0
OnNext: 1 - 1
OnNext: 1 - 2
OnNext: 1 - 3
OnNext: 2 - 0
OnNext: 2 - 1
OnNext: 2 - 2
OnNext: 2 - 3
OnNext: 3 - 0
OnNext: 3 - 1
OnNext: 3 - 2
OnNext: 3 - 3

Is this the expected behavior? Or there is somenthing we are missing?
We need to perform the flat_map operation using create with the emissions not serialized. If someone could give us a hand to workaround this, we will be very grateful.

Thanks in advance! 😃

@victimsnino
Copy link
Collaborator

Looks like it is because range observable uses scheduling under-hood via identity_current_thread instead of identity_immediate

    template<class T>
    static auto range(T first = 0, T last = std::numeric_limits<T>::max(), std::ptrdiff_t step = 1)
        -> decltype(rxs::range<T>(first, last, step, identity_current_thread())) {
        return      rxs::range<T>(first, last, step, identity_current_thread());
    }

I've expected, that second output should be "canonical", due to flat_map just subscribes on observable. In your cases both of observables are synchronized (i mean, no direct usage of threads and etc), so, subscribe should wait till on_completed. By this reason second result is correct. To have "non-serialized" output for second scenario you need to provide some scheduling manually.

@kirkshoop , what is reason to use current_thread instead of immediate for range observable?

@kirkshoop
Copy link
Member

current_thread is used so that multiple ranges interleave values even on a single thread. Not interleaving values can create infinite allocations inside some operators (zip is one culprit).

The create usage here is blocking in the subscribe function until the for loop completes. To get interleaving a loop cannot be used in create. Instead there must be some state and a function scheduled that sends one value and updates the state and reschedules itself.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants