Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cmd throttling #1070

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft

Cmd throttling #1070

wants to merge 6 commits into from

Conversation

h0lg
Copy link

@h0lg h0lg commented Mar 1, 2024

  • improving debounce test and doco to warn against misuse
  • adding command factories for throttling and buffered throttling (guaranteeing the last message is dispatched)

The throttling methods are intended for stuff like throttling Progress<> updates from background tasks like the following search does while yielding results from an IAsyncEnumerable:

    let private searchCmd model =
        fun dispatch ->
            async {
                let command = mapToSearchCommand model
                
                let dispatchProgress = Cmd.bufferedThrottle 100 (fun progress ->
                    System.Diagnostics.Debug.WriteLine("progress dispatched " + progress.ToString())
                    SearchProgress progress)
                    
                let reporter = Progress<BatchProgress>(fun progress ->
                    System.Diagnostics.Debug.WriteLine("progress reported" + progress.ToString())
                    dispatchProgress progress
                        // dispatch messages from returned command
                        |> List.iter (fun effect -> effect dispatch))
                
                use cts = new CancellationTokenSource()

                do! searchAsync(command, reporter, cts.Token)
                    // yield results from IAsyncEnumerable as they're available, see https://github.com/fsprojects/FSharp.Control.TaskSeq
                    |> TaskSeq.iter (fun result -> SearchResult result |> dispatch)
                    |> Async.AwaitTask

                dispatch SearchCompleted
            } |> Async.StartImmediate
        |> Cmd.ofEffect

I'm a Fabulous and F# freshie, so please have a good hard look at my changes and above intended use case. Feel free to point out anything that looks weird or cumbersome to you - I'm here to learn :)

h0lg added a commit to h0lg/SubTubular that referenced this pull request Mar 2, 2024
@TimLariviere
Copy link
Member

@h0lg Thank you very much for your PR. It's looking very good.
Sorry for the wait, I was quite busy lately.

Would you have an example where Cmd.bufferedThrottle would be useful compared to Cmd.debounce.
I feel like they are doing more or less similar in that it dispatches only the last message in a given interval.

@h0lg
Copy link
Author

h0lg commented Mar 12, 2024

Would you have an example where Cmd.bufferedThrottle would be useful compared to Cmd.debounce.
I feel like they are doing more or less similar in that it dispatches only the last message in a given interval.

Sure, please have a closer look at above example. It uses Cmd.bufferedThrottle to continuously dispatch the most recent progress event at a max. rate of 100ms (dropping some like Cmd.throttle would do) while also making sure the last progress update is dispatched (kind of like Cmd.debounce does).
A progress is a good example for a buffered throttle because if you want to throttle the dispatch, you don't want to risk losing the last update that completes the progress to regular throttling, which by default doesn't guarantee this.

I've tried to describe this here:
https://github.com/fabulous-dev/Fabulous/pull/1070/files#diff-5e8d1d8af957012671e1fe3e5b793de08e468693877f4eac75ab21fe7526cefcR258-R262

Feel free to suggest a better name for the concept. Maybe rateLimitedStack, throttledStack or lastThrottled?

I found this visualization helpful to grasp the difference between throttle and debounce: https://web.archive.org/web/20220117092326/http://demo.nimius.net/debounce_throttle/

@h0lg
Copy link
Author

h0lg commented Mar 18, 2024

I've added a batchedThrottle variant that throttles the dispatch while returning the pending values in a list message.

type Msg = | Batch of int list // enables batching values of the same type for dispatch

// a factory that batches and dispatches the pending values every 100 ms; takes a value and produces a Cmd
// declare per program or long running background task
let createThrottledMsgBatchCmd = batchedThrottle 100 (fun values ->
    System.Diagnostics.Debug.WriteLine("dispatching a batch of values to the MVU loop " + values.ToString())
    Batch values)

// an optional wrapper for usage inside of Cmd.ofEffect giving the factory function a dispatch signature 
let dispatchToThrottledFactory value =
    System.Diagnostics.Debug.WriteLine("dispatching a single value to the throttled batch factory " + value.ToString())
    createThrottledMsgBatchCmd value |> List.iter (fun effect -> effect dispatch) // make the MVU dispatch available to the returned command

produceIntegersFast dispatchToThrottledFactory // prevents this function from spamming the MVU loop

@h0lg
Copy link
Author

h0lg commented Mar 21, 2024

I've added a second return value to Cmd.batchedThrottle that allows waiting for the next dispatch to make sure dispatch has completed from outside of the factory method.

let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues
... some awaited (!) producers using createCmd ...
// wait until next dispatch plus a little optional buffer to avoid race conditions
let! _ = awaitNextDispatch (Some(TimeSpan.FromMilliseconds(10)))
// I can be sure now all messages have been dispatched 

What do you think about this pattern? Cmd.debounce and Cmd.bufferedThrottle could benefit from a similar mechanism.

@h0lg h0lg force-pushed the cmd-throttling branch 2 times, most recently from 473db05 to 8108219 Compare March 26, 2024 21:42
@h0lg
Copy link
Author

h0lg commented Apr 10, 2024

In the latest iteration I've rewritten Cmd.batchedThrottle to be a Dispatch extension - because I found it awkward how the Cmd method forced me to iterate the contained effects to make the dispatch available to them. See the |> List.iter (fun effect -> effect dispatch) above.

This version, dispatch.batchThrottled, I found to fit more naturally in my scenario, which is a rapidly producing and progress reporting IAsyncEnumerable<'value> on a background task , i.e. with dispatch available.

I can for example write extensions like

type AsyncEnumerableExtensions =

    [<Extension>]
    static member dispatchTo((this: System.Collections.Generic.IAsyncEnumerable<'result>), (dispatch: 'result -> unit)) =
        async {
            let results = this.GetAsyncEnumerator()

            let rec dispatchResults () =
                async {
                    let! hasNext = results.MoveNextAsync().AsTask() |> Async.AwaitTask

                    if hasNext then
                        results.Current |> dispatch
                        do! dispatchResults ()
                }

            do! dispatchResults ()
        }

    [<Extension>]
    static member dispatchBatchThrottledTo
        (
            (this: System.Collections.Generic.IAsyncEnumerable<'result>),
            throttleInterval,
            (mapPendingResultsToBatchMsg: 'result list -> 'msg),
            (dispatch: 'msg -> unit)
        ) =
        async {
            // create a throttled dispatch of a batch of pending results at regular intervals
            let dispatchSingleResult, awaitNextDispatch =
                dispatch.batchThrottled (throttleInterval, mapPendingResultsToBatchMsg)

            do! this.dispatchTo dispatchSingleResult // dispatch single results using throttled method
            do! awaitNextDispatch (Some throttleInterval) // to make sure all results are dispatched before calling it done
        }

and then throttle the progress reporting as well as the result dispatch effectively:

type Msg
    | SearchProgressReports of BatchProgress list
    | SearchResults of SearchResult list
    | SearchCompleted

let private searchCmd model =
    fun dispatch ->
        async {
            let command = mapToSearchCommand model
            let dispatchProgress, awaitNextProgressDispatch = dispatch.batchThrottled(100, SearchProgressReports)
            let reporter = Progress<BatchProgress>(dispatchProgress)
            use cts = new CancellationTokenSource()
            do! searchAsync(command, reporter, cts.Token).dispatchBatchThrottledTo (100, SearchResults, dispatch)
            do! awaitNextProgressDispatch (Some 100) // to make sure all progresses are dispatched before calling it done
            dispatch SearchCompleted
        } |> Async.StartImmediate
    |> Cmd.ofEffect

Whether - and if, in what form - you want this in Fabulous is up to you. But I found it this helpful to prevent the MVU loop from choking up when feeding too many messages into it too rapidly.

@h0lg h0lg marked this pull request as draft April 10, 2024 23:00
…o await the next dispatch

Should debounce and bufferedThrottle follow the same API?
because it feels more natural to use it that way with a dispatch inside an ofEffect that produces values rapidly
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants