Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back-pressure aware operators #236

Open
Atry opened this issue Jun 28, 2023 · 3 comments
Open

Back-pressure aware operators #236

Atry opened this issue Jun 28, 2023 · 3 comments

Comments

@Atry
Copy link
Contributor

Atry commented Jun 28, 2023

Currently merge_all_threads can result in out-of-memory, when concurrent is not Infinity and the source submits items faster than the mapper handles items.

I wonder if we could change the type of concurrent from usize to a struct:

pub struct ConcurrentConfig<F, Item> {
  usize max_number_of_inner_observers;
  usize max_buffer_size;
  F backpressure_handler;
}

impl<F, Item> ConcurrentConfig<F, Item>
where
    F: FnMut(Item) -> () {
}
@M-Adoo
Copy link
Collaborator

M-Adoo commented Jun 28, 2023

I'd be glad to accept Back-pressure support for merge_all_threads op. To implement it, I would prefer to keep the concurrent type and add a with_backpressure method to config the callback for MergeAllOp/MergeAllOpThreads. So, we can keep the API compatible and make the Back-pressure be an optional feature.

@Atry
Copy link
Contributor Author

Atry commented Jun 28, 2023

How do you think about merge_scan, like https://rxjs.dev/api/index/function/mergeScan ?

I think merge_scan is useful to be used as a general producer/consumer operator.

Would you like to review a merge_scan_with_backpressure PR?

@M-Adoo
Copy link
Collaborator

M-Adoo commented Jun 29, 2023

Would you like to review a merge_scan_with_backpressure PR?

Of course.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants