Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JavaFxObservable.actionEventsOf().subscribeOn(Schedulers.newThread()) misses some emissions #77

Open
pkrysztofiak opened this issue Nov 18, 2018 · 3 comments

Comments

@pkrysztofiak
Copy link

I was learning about concurrency from your book (btw it's great) and ran into this.
When creating an Observable using JavaFxObservable factory methods and following it with subscribeOn(Scheduler) the Scheduler should be suppressed by default JavaFxScheduler.platform().
It is exectly what happens when using JavaFxObservable.actionEventsOf() but there is strange behaviour using it with JavaFxObservable.additionsOf().
It works fine (both subscribers receive their emisions) when new list element is added from inside button click handler but adding element straight from the main method causes lack of emissions to Observer[2].

import io.reactivex.Observable;
import io.reactivex.rxjavafx.observables.JavaFxObservable;
import io.reactivex.schedulers.Schedulers;
import javafx.application.Application;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.stage.Stage;

public class SubscribeOnTestApp extends Application {

    public static void main(String[] args) {
        launch(args);
    }
    
    @Override
    public void start(Stage stage) throws Exception {
        
        Button button = new Button("Click me!");
        stage.setScene(new Scene(button));
        stage.show();
        
        ObservableList<String> strings = FXCollections.observableArrayList();
        
        JavaFxObservable.actionEventsOf(button)
        .subscribeOn(Schedulers.newThread())
        .subscribe(actionEvent -> {
            System.out.println("[" + Thread.currentThread().getName() + "] button actionEvent");
            strings.add("item");
        });
        
        Observable<String> observable = JavaFxObservable.additionsOf(strings);
        
        observable
        .subscribe(next -> System.out.println("[" + Thread.currentThread().getName() + "] Observer[1] next=" + next));
        
        observable
        .subscribeOn(Schedulers.newThread())
        .subscribe(next -> System.out.println("[" + Thread.currentThread().getName() + "] Observer[2] next=" + next));
        
        strings.add("initialItem");
    }
}
@thomasnield
Copy link
Collaborator

Thank you, I'm glad you like it. There should never be a good reason to use subscribeOn() against any JavaFX sources. You'll always want to use observeOn() to switch elements from FX thread to different thread at that point going downstream.

It's interesting and probably not a good behavior though for folks who don't know better. I'm not quite sure what's causing the issue since there is already an internal subscribeOn().

https://github.com/ReactiveX/RxJavaFX/blob/2.11.x/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java#L63

However, I have stopped putting subscribeOn(JavaFxScheduler.platform() on other collection event factories and I must have missed this one. I wonder if getting rid of the internal subscribeOn() will fix things.

https://github.com/ReactiveX/RxJavaFX/blob/2.11.x/src/main/java/io/reactivex/rxjavafx/sources/ObservableListSource.java#L33-L47

That is interesting regardless though.

@pkrysztofiak
Copy link
Author

pkrysztofiak commented Nov 20, 2018

It's going to be a long one but I would really appreciate you read it.
Because currently I'm fascinated with observable collections and potential usages. I have some thoughts I would really appreciate you to consider.
I'm able to use observable collections in isolation from Fx:

import javafx.collections.FXCollections;
import javafx.collections.ListChangeListener;
import javafx.collections.ObservableList;

public class ObservableListApp {

    public static void main(String[] args) {
        ObservableList<String> letters = FXCollections.observableArrayList();
        
        letters.addListener((ListChangeListener<String>) change -> {
            while (change.next()) {
                if (change.wasAdded()) {
                    change.getAddedSubList().forEach(letter -> System.out.println("[" + Thread.currentThread().getName() + "] letter=" + letter));
                }
            }
        });
        
        letters.add("Alpha");
        letters.add("Beta");
    }
}

Above app works perfectly fine.
In my opinion observable collections are not the part of FX UI as itself. They are widely used by FX UI but they should be able to work in separation. My thesis is - JavaFxObservale.additionsOf() (and every other Observable created from observable collections) should work with Schedulers.immediate() (and of course option to provide custom Scheduler). That would give us a chance to use Rx with collections on the Model side.
Here is my test app:

import java.util.concurrent.Executors;

import io.reactivex.Observable;
import io.reactivex.rxjavafx.observables.JavaFxObservable;
import io.reactivex.rxjavafx.schedulers.JavaFxScheduler;
import io.reactivex.schedulers.Schedulers;
import javafx.application.Application;
import javafx.collections.FXCollections;
import javafx.collections.ObservableList;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.Label;
import javafx.scene.control.TextField;
import javafx.scene.layout.HBox;
import javafx.stage.Stage;

public class ModelSideObservableListApp extends Application {

    private final TextField textField = new TextField();
    private final Button button = new Button("Run!");
    private final Label label = new Label(); 
    private final HBox hBox = new HBox(textField, button, label);
    private final Scene scene = new Scene(hBox);
    
    private final ObservableList<String> list = FXCollections.observableArrayList();
    
    public static void main(String[] args) {
        launch(args);
    }
    
    @Override
    public void start(Stage stage) throws Exception {
        hBox.setPrefSize(400, 100);
        stage.setScene(scene);
        stage.show();
        
        JavaFxObservable.actionEventsOf(button)
        .map(actionEvent -> textField.getText())
        .observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
        .map(Integer::valueOf)
        .flatMapSingle(i -> Observable.range(1, i).map(String::valueOf).toList())
        .doOnNext(next -> System.out.println("[" + Thread.currentThread().getName() + "] next=" + next))
        .subscribe(list::addAll);
        
        JavaFxObservable.additionsOf(list)
        .subscribeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
        .doOnNext(next -> System.out.println("[" + Thread.currentThread().getName() + "] next=" + next))
        .observeOn(JavaFxScheduler.platform())
        .subscribe(next -> label.setText("Just got " + next + " new emissions"));
    }
}

The idea behind the app is (and I'm a little bit confused it works as I expected):

  1. list is the Model side

  2. I get the event from Fx (number in TextField and Run button)

  3. Immediately after getting data I switch to another thread (Executors.newSingleThreadExecutor()) and update the Model (add to list)

  4. Other components of the View (label) are listening on model, but they should do it on the same thread (Executors.newSingleThreadExecutor()). Reason is in ListChangeListener.Change documentation

For this reason it is not safe to use this class on a different thread.

(https://docs.oracle.com/javase/8/javafx/api/javafx/collections/ListChangeListener.Change.html)

  1. Thread switches back to FX thread and updates view (label)

The final point is - observable collections should work by default on Schedulers.immediate() and get an option of change Scheduler just like Observable.interval(period, unit, scheduler)
That was long :)

@thomasnield
Copy link
Collaborator

🤔 Thanks for putting this reasoning together. I do agree that JavaFX ObservableCollection should be agnostic to which thread it modifies/fires on.

However, I don't know if this is kosher to make subscribeOn() behavior do essentially what is the role of an observeOn().

You see, the ObservableList sort of acts like an event bus, or in Rx terminology it is a Subject. Subjects do not respect subscribeOn() calls but rather push emissions on whatever thread arbitrarily fires those events.

Events on ObservableList are of the same nature. One thread is going to be modifying elements and therefore also firing the listeners. A subscribeOn() is helpless and cannot control which thread modifies elements, and therefore doesn't have control over the firing events either. The Rx thing to do is to use observeOn() to take the emissions coming from that collection-modifying thread, and switch to a different thread at that point in the Observable chain rather than giving an illusion it is overriding the thread of the source.

If you haven't gotten into subjects yet, my Learning RxJava book is $10 now. I've seen it go on sale for $5 on Black Fridays... which is this Friday.
https://www.packtpub.com/application-development/learning-rxjava

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants