Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Custom "Disposable" from create #196

Open
arturoc opened this issue Sep 27, 2022 · 6 comments
Open

Custom "Disposable" from create #196

arturoc opened this issue Sep 27, 2022 · 6 comments

Comments

@arturoc
Copy link

arturoc commented Sep 27, 2022

When creating a new Observable using create the original c# implementation allows to return a custom IDisposable so one can have more control over the completion of the Observable. With rxRust there's apparently no way to do that?

@M-Adoo
Copy link
Collaborator

M-Adoo commented Sep 28, 2022

There isn't a trait like IDisposable has provided, would you provide me a case what you want use IDisposable to do ?

@arturoc
Copy link
Author

arturoc commented Sep 28, 2022

If I understand it correctly in Rx IDisposable is used to unsubscribe an observer from an observable. In rxRust that appears to be SubscriptionLike. At the moment there doesn't seem to be any way to terminate a subscription before completion when using create to create an observable right?

For example in the Rx documentation for create there's this example:

var ob = Observable.Create<string>(
    observer =>
    {
        var timer = new System.Timers.Timer();
        timer.Enabled = true;
        timer.Interval = 100;
        timer.Elapsed += OnTimerElapsed;
        timer.Start();
        return ()=>{
            timer.Elapsed -= OnTimerElapsed;
            timer.Dispose();
        };
    });

Which returns an IDisposable that when disposed removes the event delegate from the timer and disposes the timer.

A very naive implementation in rxRust could be something like:

let ob = observable::create(|observer| {
    let mut i = 0;
    loop {
            std::thread::sleep(Duration::from_millis(100));
            observer.next(i);
            i += 1;
    }
});

let pool = ThreadPool::new().unwrap();
let mut subs = ob.into_shared().subscribe_on(pool).into_shared().subscribe(|v| println!("threaded timer {v}"));
std::thread::sleep(Duration::from_secs(1));
subs.unsubscribe();

But unsubscribe doesn't really finish the subscription as there's no way to notify the observable that the timer should be stopped. I would have expected the equivalent rust code could return a SubscriptionLike?

@M-Adoo
Copy link
Collaborator

M-Adoo commented Sep 28, 2022

In rxRust the infinite loop that block you unsubscribe. There are some differences between Rust with C#, Rust not have a default event loop, so you can't create a timer with callback like that, but you can use Future to do similar thing. In rxRust, interval or from_future you can use.

@Trequetrum
Copy link
Contributor

Trequetrum commented Nov 29, 2022

I think @arturoc is right. The API does feel incomplete for create.

pub fn create<F, Item, Err>(subscribe: F) -> ObservableFn<F, Item, Err>
where
  F: FnOnce(&mut dyn Observer<Item = Item, Err = Err>),
{
  ObservableFn(subscribe, TypeHint::new())
}

It seems to me that create should probably have this API instead:

pub fn create<F, Sub, Item, Err>(subscribe: F) -> ObservableFn<F, Item, Err>
where
  F: FnOnce(&mut dyn Observer<Item = Item, Err = Err>) -> Sub,
  Sub: SubscriptionLike
{
  ObservableFn(subscribe, TypeHint::new())
}

This way, whenever you subscribe to an observable, you should get a SubscriptionLike back of some sort. Then once you've unsubscribed, the supplied SubscriptionLike is called alongside whatever else needs happen in order to clean up the observables resources. This gives users a hook into what happens when their Observable is unsubscribed.

So a rough example might look like:

let obs = observable::create(|mut subscriber| {
    let timer_handle = five_min_callback_timer(|| {
        subscriber.next(0);
        subscriber.complete();
    });
    create_subscription(move || {
        // Any custom cleanup code can go here.
        // In this example, that's cleaning up a timer.
        drop(timer_handle);
    })
})
let sub = obs.subscribe(|v| println!("{:?}", v));
// ... some time later
// Clean up resources, the user defined SubscriptionLike should be invoked at some point here
sub.unsubscribe();
// By this line, `handle` has been consumed

@M-Adoo
Copy link
Collaborator

M-Adoo commented Dec 1, 2022

@arturoc good to me. And we can let create accept both impl FnOnce<&mut Observer> and impl FnOnce(&mut Observer) -> impl SubscriptionLike. This friendly with user who only want emit item and backward compatibility.

@arturoc
Copy link
Author

arturoc commented Dec 1, 2022

I don't have the time to send a PR right now but what @Trequetrum posts is what I was talking about.

I think implementing SubscriptionLike for () may be easier and would have the same effect as trying to make create accept both impl FnOnce<&mut Observer> and impl FnOnce(&mut Observer) -> impl SubscriptionLike but don't remember the implementation details at the moment so well, so I might be wrong

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants