Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

identity_current_thread is bad "default" scheduler for operators #593

Open
victimsnino opened this issue Nov 2, 2022 · 0 comments
Open

Comments

@victimsnino
Copy link
Collaborator

victimsnino commented Nov 2, 2022

Hi everyone and hi @kirkshoop!

Most of the operators in rxcpp which requires schedulers has such an fallback scheduler:

If scheduler is omitted, identity_current_thread is used.

For example, merge operator also has it as default. BUT it doesn't provide ANY synchronization/serialization in case of multithreaded application.

Example:

    rxcpp::observable<>::just(1, rxcpp::observe_on_new_thread())
        .repeat()
        .merge(rxcpp::observable<>::just(2, rxcpp::observe_on_new_thread())
                    .repeat())
    .take(10).as_blocking().subscribe([](int v){
        std::cout << "==================\n" << std::this_thread::get_id() << " START " << std::endl;
        std::this_thread::sleep_for(std::chrono::seconds{1});
        std::cout << v << std::endl;
        std::cout << std::this_thread::get_id() << " END " << std::endl << "==================\n\n";

    });

Possible output:

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 
==================

==================
0x70000ef5f000 START 
==================
0x70000efe2000 START 
1
0x70000efe2000 END 
==================

2
0x70000ef5f000 END 
==================

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 
==================

==================
0x70000ef5f000 START 
==================
0x70000efe2000 START 
1
0x70000ef5f000 END 
==================

2
0x70000efe2000 END 
==================

==================
0x70000ee59000 START 
==================
0x70000eedc000 START 
1
0x70000ee59000 END 
==================

2
0x70000eedc000 END 

As you can see, it is mixed, but ReactiveX requires that any observable should be serialized.

In case of using any valid scheduler in merge like observe_on_new_thread output is valid:

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
1
0x700001ce1000 END 
==================

==================
0x700001ce1000 START 
2
0x700001ce1000 END 
==================

Expected: at least default behavior of any operator should be thread-safe...

In my understanding, best default scheduler for such an "multhithreaded" opertators can be "serialize_immediate" (not exist, but actually just emit emissions under mutex to provide exclusive access to subscriber and guarantee that only one observable pushes item at the same time). Tested locally: also provides valid output

BTW: it is how i've implemented merge in ReactivePlusPlus: each callback to subscriber of merge just called under mutex. As a result there is no way to obtain "mixed" log. @kirkshoop, what do you think ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant