diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 073b6d8fc..f3aed2229 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -3,7 +3,9 @@ namespace Fabulous.Tests open Fabulous open NUnit.Framework -type CmdTestsMsg = NewValue of int +type CmdTestsMsg = + | NewValue of int + | NewValues of int list module CmdTestsHelper = let execute dispatch (cmd: Cmd<'msg>) = @@ -13,13 +15,14 @@ module CmdTestsHelper = [] type ``Cmd tests``() = [] - member _.``Cmd.debounce only dispatch the last message``() = + member _.``Cmd.debounce only dispatches the last messages within the timeout``() = async { + let mutable messageCount = 0 let mutable actualValue = None let dispatch msg = - if actualValue.IsNone then - actualValue <- Some msg + messageCount <- messageCount + 1 + actualValue <- Some msg let triggerCmd = Cmd.debounce 100 NewValue @@ -30,14 +33,208 @@ type ``Cmd tests``() = triggerCmd 3 |> CmdTestsHelper.execute dispatch do! Async.Sleep 125 + Assert.AreEqual(1, messageCount) Assert.AreEqual(Some(NewValue 3), actualValue) - actualValue <- None - triggerCmd 4 |> CmdTestsHelper.execute dispatch do! Async.Sleep 75 triggerCmd 5 |> CmdTestsHelper.execute dispatch do! Async.Sleep 125 + Assert.AreEqual(2, messageCount) Assert.AreEqual(Some(NewValue 5), actualValue) } + + [] + member _.``Cmd.throttle issues message at specified intervals``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.throttle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 50 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 75 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 3), actualValue) + + throttleCmd 4 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 75 + throttleCmd 5 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + Assert.AreEqual(3, messageCount) + Assert.AreEqual(Some(NewValue 4), actualValue) + } + + [] + member _.``Cmd.throttle issues only one message per interval``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.throttle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 35 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + // Only the first message should have been dispatched + Assert.AreEqual(1, messageCount) + Assert.AreEqual(Some(NewValue 1), actualValue) + } + + [] + member _.``Cmd.bufferedThrottle dispatches the first and most recent message within the specified interval``() = + async { + let mutable messageCount = 0 + let mutable actualValue = None + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.bufferedThrottle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 2 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 10 + throttleCmd 3 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 20 + throttleCmd 4 |> CmdTestsHelper.execute dispatch + do! Async.Sleep 125 + + // Only the first and most recent message should be dispatched + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 4), actualValue) + } + + [] + member _.``Cmd.bufferedThrottle dispatches the most recent message even if delayed``() = + async { + let mutable actualValue = None + let mutable messageCount = 0 + + let dispatch msg = + messageCount <- messageCount + 1 + actualValue <- Some msg + + let throttleCmd = Cmd.bufferedThrottle 100 NewValue + + throttleCmd 1 |> CmdTestsHelper.execute dispatch + throttleCmd 2 |> CmdTestsHelper.execute dispatch + + // Only the first message should have been dispatched + Assert.AreEqual(1, messageCount) + Assert.AreEqual(Some(NewValue 1), actualValue) + + do! Async.Sleep 200 // Wait longer than the throttle interval + + // the second message should have been dispatched delayed + Assert.AreEqual(2, messageCount) + Assert.AreEqual(Some(NewValue 2), actualValue) + } + + [] + member _.``Dispatch.batchThrottled dispatches all undispatched values on interval expiry``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) + + batchedThrottleCmd 1 + batchedThrottleCmd 2 + batchedThrottleCmd 3 + batchedThrottleCmd 4 + + do! Async.Sleep 200 // Wait longer than the throttle interval + + // All three values should have been dispatched + Assert.AreEqual(2, messageCount) + Assert.AreEqual([ NewValues [ 2; 3; 4 ]; NewValues [ 1 ] ], dispatched) + } + + [] + member _.``Dispatch.batchThrottled dispatches messages immediately if interval not expired``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) + + batchedThrottleCmd 1 + batchedThrottleCmd 2 + + // Only the first value should have been dispatched immediately + Assert.AreEqual(1, messageCount) + Assert.AreEqual([ NewValues[1] ], dispatched) + + (* Wait for longer than twice the throttle interval, + giving second value time to dispatch and elapsing time until next dispatch *) + do! Async.Sleep 210 + + batchedThrottleCmd 3 + batchedThrottleCmd 4 + + // Second value should have dispatched delayed, third immediately + Assert.AreEqual(3, messageCount) + Assert.AreEqual([ NewValues[3]; NewValues[2]; NewValues[1] ], dispatched) + + do! Async.Sleep 110 // Wait longer than the throttle interval + + // All values should have been dispatched eventually + Assert.AreEqual(4, messageCount) + Assert.AreEqual([ NewValues[4]; NewValues[3]; NewValues[2]; NewValues[1] ], dispatched) + } + + [] + member _.``Dispatch.batchThrottled factory can be awaited for completion``() = + async { + let mutable messageCount = 0 + let mutable dispatched = [] // records dispatched messages latest first + + let dispatch msg = + messageCount <- messageCount + 1 + dispatched <- msg :: dispatched + + let createCmd, awaitNextDispatch = dispatch.batchThrottled(100, NewValues) + + createCmd 1 + createCmd 2 + + // Only the first value should have been dispatched immediately + Assert.AreEqual(1, messageCount) + Assert.AreEqual([ NewValues[1] ], dispatched) + + do! awaitNextDispatch None // only waits until next dispatch + + // All values should have been dispatched after waiting + Assert.AreEqual(2, messageCount) + Assert.AreEqual([ NewValues[2]; NewValues[1] ], dispatched) + } diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 4a4b96956..cc83a4eb7 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -1,5 +1,6 @@ namespace Fabulous +open System.Runtime.CompilerServices open System.Threading open System.Threading.Tasks @@ -188,20 +189,33 @@ module Cmd = let inline msgOption (task: Task<'msg option>) = OfAsync.msgOption(task |> Async.AwaitTask) - /// Command to issue a message if no other message has been issued within the specified timeout + /// Creates a factory for Commands that dispatch a message only + /// if the factory produces no other Command within the specified timeout. + /// Helps control how often a message is dispatched by delaying the dispatch after a period of inactivity. + /// Useful for handling noisy inputs like keypresses or scrolling, and preventing too many actions in a short time, like rapid button clicks. + /// Note that this creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// The time to wait for the next Command from the factory in milliseconds. + /// Maps a factory input value to a message for delayed dispatch. + /// A Command factory function that maps an input value to a "sleeper" Command which dispatches a delayed message (mapped from the value). + /// This command is cancelled if the factory produces another Command within the specified timeout; otherwise it succeeds and the message is dispatched. let debounce (timeout: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = - let funLock = obj() - let mutable cts: CancellationTokenSource = null + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + // return a factory function mapping input values to "sleeper" Commands with delayed dispatch fun (value: 'value) -> [ fun dispatch -> lock funLock (fun () -> + // cancel the last sleeping Command issued earlier from this factory if cts <> null then cts.Cancel() cts.Dispose() + // make cancellation available to the factory's next Command cts <- new CancellationTokenSource() + // asynchronously wait for the specified time before dispatch Async.Start( async { do! Async.Sleep(timeout) @@ -209,9 +223,185 @@ module Cmd = lock funLock (fun () -> dispatch(fn value) + // done; invalidate own cancellation token if cts <> null then cts.Dispose() cts <- null) }, cts.Token )) ] + + /// Creates a factory for Commands that dispatch a message only + /// if the factory produced no other Command within the specified interval. + /// This limits how often a message is dispatched by ensuring to only dispatch once within a certain time interval + /// and dropping messages that are produces during the cooldown. + /// Useful for limiting how often a progress message is shown or preventing too many updates to a UI element in a short time. + /// Note that this creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// The minimum time interval between two consecutive Command executions in milliseconds. + /// Maps a factory input value to a message for dispatch. + /// A Command factory function that maps an input value to a "throttled" Command which dispatches a message (mapped from the value) + /// if the minimum time interval has elapsed since the last Command execution; otherwise, it does nothing. + let throttle (interval: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = + let mutable lastDispatch = System.DateTime.MinValue + + // return a factory function mapping input values to "throttled" Commands that only dispatch if enough time passed + fun (value: 'value) -> + [ fun dispatch -> + let now = System.DateTime.UtcNow + + // If the interval has elapsed since the last execution, dispatch the message + if now - lastDispatch >= System.TimeSpan.FromMilliseconds(float interval) then + lastDispatch <- now + dispatch(fn value) ] + + /// + /// Creates a Command factory that dispatches the most recent message in a given interval - even if delayed. + /// This makes it similar to in that it rate-limits the message dispatch + /// and similar to in that it guarantees the last message (within the interval or in total) is dispatched. + /// Helpful for scenarios where you want to throttle, but cannot risk losing the last message to throttling + /// - like the last progress update that completes a progress. + /// Note that this function creates an object with internal state and is intended to be used per Program or longer-running background process + /// rather than once per message in the update function. + /// + /// The minimum time interval between two consecutive Command executions in milliseconds. + /// A function that maps a factory input value to a message for dispatch. + /// + /// A Command factory function that maps an input value to a Command which dispatches a message (mapped from the value), either immediately + /// or after a delay respecting the interval, while cancelling older commands if the factory produces another Command before the interval has elapsed. + /// + let bufferedThrottle (interval: int) (fn: 'value -> 'msg) : 'value -> Cmd<'msg> = + let rateLimit = System.TimeSpan.FromMilliseconds(float interval) + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable lastDispatch = System.DateTime.MinValue + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + + // Return a factory function mapping input values to sleeper Commands with delayed dispatch of the most recent message + fun (value: 'value) -> + [ fun dispatch -> + lock funLock (fun () -> + let now = System.DateTime.UtcNow + let elapsedSinceLastDispatch = now - lastDispatch + + // If the interval has elapsed since the last dispatch, dispatch immediately + if elapsedSinceLastDispatch >= rateLimit then + dispatch(fn value) + lastDispatch <- now + else // schedule the dispatch for when the interval is up + // cancel the last sleeper Command issued earlier from this factory + if cts <> null then + cts.Cancel() + cts.Dispose() + + // make cancellation available to the factory's next Command + cts <- new CancellationTokenSource() + + // asynchronously wait for the remaining time before dispatch + Async.Start( + async { + do! Async.Sleep(rateLimit - elapsedSinceLastDispatch) + + lock funLock (fun () -> + dispatch(fn value) + lastDispatch <- System.DateTime.UtcNow + + // done; invalidate own cancellation token + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) ] + +type DispatchExtensions = + + /// + /// Creates a throttled dispatch factory that dispatches values in batches at a fixed minimum interval/maximum rate + /// while ensuring that all values are dispatched eventually. + /// This helps throttle the message dispatch of a rapid producer to avoid overloading the MVU loop + /// without dropping any of the carried values - ensuring all values are processed in batches at a controlled rate. + /// Note that this function creates an object with internal state and is intended to be used per Program + /// or longer-running background process rather than once per message in the update function. + /// + /// The minimum time interval between two consecutive dispatches in milliseconds. + /// A function that maps a list of pending input values to a message for dispatch. + /// + /// Two functions. The first has a Dispatch signature and is used to feed a single value into the factory, + /// where it is either dispatched immediately or after a delay respecting the interval, + /// batched with other pending values in the order they were fed in. + /// The second can be used for awaiting the next dispatch from the outside + /// - while optionally adding some buffer time (in milliseconds) to account for race condiditions. + /// + [] + static member batchThrottled((dispatch: Dispatch<'msg>), interval, (mapBatchToMsg: 'value list -> 'msg)) = + let rateLimit = System.TimeSpan.FromMilliseconds(interval) + let funLock = obj() // ensures safe access to resources shared across different threads + let mutable lastDispatch = System.DateTime.MinValue + let mutable pendingValues: 'value list = [] + let mutable cts: CancellationTokenSource = null // if set, allows cancelling the last issued Command + + // gets the time to wait until the next allowed dispatch returning a negative timespan if the time is up + let getTimeUntilNextDispatch () = + lastDispatch.Add(rateLimit) - System.DateTime.UtcNow + + // dispatches all pendingValues and resets them while updating lastDispatch + let dispatchBatch () = + // Dispatch in the order they were received + pendingValues |> List.rev |> mapBatchToMsg |> dispatch + + lastDispatch <- System.DateTime.UtcNow + pendingValues <- [] + + // a function with the Dispatch signature for feeding a single value into the throttled batch factory + let dispatchSingle = + fun (value: 'value) -> + lock funLock (fun () -> + let untilNextDispatch = getTimeUntilNextDispatch() + pendingValues <- value :: pendingValues + + // If the interval has elapsed since the last dispatch, dispatch all pending messages + if untilNextDispatch <= System.TimeSpan.Zero then + dispatchBatch() + else // schedule dispatch + + // if the the last sleeping dispatch can still be cancelled, do so + if cts <> null then + cts.Cancel() + cts.Dispose() + + // used to enable cancelling this dispatch if newer values come into the factory + cts <- new CancellationTokenSource() + + Async.Start( + async { + // wait only as long as we have to before next dispatch + do! Async.Sleep(untilNextDispatch) + + lock funLock (fun () -> + dispatchBatch() + + // done; invalidate own cancellation + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) + + // a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete + let awaitNextDispatch buffer = + lock funLock (fun () -> + async { + if not pendingValues.IsEmpty then + let untilAfterNextDispatch = + getTimeUntilNextDispatch() + + match buffer with + | Some value -> System.TimeSpan.FromMilliseconds(value) + | None -> System.TimeSpan.Zero + + if untilAfterNextDispatch > System.TimeSpan.Zero then + do! Async.Sleep(untilAfterNextDispatch) + }) + + // return both the dispatch and the await helper + dispatchSingle, awaitNextDispatch