diff --git a/.idea/.idea.Remora.Discord/.idea/vcs.xml b/.idea/.idea.Remora.Discord/.idea/vcs.xml index 39036eb7b2..b9cb099b0d 100644 --- a/.idea/.idea.Remora.Discord/.idea/vcs.xml +++ b/.idea/.idea.Remora.Discord/.idea/vcs.xml @@ -14,5 +14,6 @@ + \ No newline at end of file diff --git a/Backend/Remora.Discord.Gateway/Services/ResponderDispatchService.cs b/Backend/Remora.Discord.Gateway/Services/ResponderDispatchService.cs index dc1bf94b1a..d7e0204bf7 100644 --- a/Backend/Remora.Discord.Gateway/Services/ResponderDispatchService.cs +++ b/Backend/Remora.Discord.Gateway/Services/ResponderDispatchService.cs @@ -49,13 +49,18 @@ public class ResponderDispatchService : IAsyncDisposable, IResponderDispatchServ private readonly IResponderTypeRepository _responderTypeRepository; private readonly Dictionary _cachedInterfaceTypeArguments; - private readonly Dictionary>>> _cachedDispatchDelegates; - private readonly CancellationTokenSource _dispatchCancellationSource; + private readonly Dictionary>>> _cachedDispatchDelegates; private readonly Task _dispatcher; private readonly Task _finalizer; private readonly Channel _payloadsToDispatch; private readonly Channel>> _respondersToFinalize; + /// + /// Holds the token source used to get tokens for running responders. Execution of the dispatch service's own tasks + /// is controlled via the channels. + /// + private readonly CancellationTokenSource _responderCancellationSource; + private bool _isDisposed; /// @@ -81,7 +86,7 @@ IOptions options _cachedInterfaceTypeArguments = new(); _cachedDispatchDelegates = new(); - _dispatchCancellationSource = new(); + _responderCancellationSource = new(); _payloadsToDispatch = Channel.CreateBounded ( new BoundedChannelOptions((int)_options.MaxItems) @@ -99,8 +104,8 @@ IOptions options } ); - _dispatcher = Task.Run(DispatcherTaskAsync, _dispatchCancellationSource.Token); - _finalizer = Task.Run(FinalizerTaskAsync, _dispatchCancellationSource.Token); + _dispatcher = Task.Run(DispatcherTaskAsync, CancellationToken.None); + _finalizer = Task.Run(FinalizerTaskAsync, CancellationToken.None); } /// @@ -138,31 +143,24 @@ public async Task DispatchAsync(IPayload payload, CancellationToken ct = /// private async Task DispatcherTaskAsync() { - while (!_dispatchCancellationSource.Token.IsCancellationRequested) + try { - var payload = await _payloadsToDispatch.Reader.ReadAsync(_dispatchCancellationSource.Token); - var dispatch = UnwrapAndDispatchEvent(payload, _dispatchCancellationSource.Token); - if (!dispatch.IsSuccess) + while (await _payloadsToDispatch.Reader.WaitToReadAsync()) { - _log.LogWarning("Failed to dispatch payload: {Reason}", dispatch.Error.Message); - continue; - } + var payload = await _payloadsToDispatch.Reader.ReadAsync(); + var dispatch = UnwrapAndDispatchEvent(payload); + if (!dispatch.IsSuccess) + { + _log.LogWarning("Failed to dispatch payload: {Reason}", dispatch.Error.Message); + continue; + } - await _respondersToFinalize.Writer.WriteAsync(dispatch.Entity, _dispatchCancellationSource.Token); + await _respondersToFinalize.Writer.WriteAsync(dispatch.Entity); + } } - - // Finish up remaining dispatches - await _payloadsToDispatch.Reader.Completion; - await foreach (var payload in _payloadsToDispatch.Reader.ReadAllAsync()) + catch (Exception ex) when (ex is OperationCanceledException or ChannelClosedException) { - var dispatch = UnwrapAndDispatchEvent(payload, _dispatchCancellationSource.Token); - if (!dispatch.IsSuccess) - { - _log.LogWarning("Failed to dispatch payload: {Reason}", dispatch.Error.Message); - continue; - } - - await _respondersToFinalize.Writer.WriteAsync(dispatch.Entity, _dispatchCancellationSource.Token); + // this is fine, no further incoming payloads to accept } _respondersToFinalize.Writer.Complete(); @@ -173,7 +171,7 @@ private async Task DispatcherTaskAsync() /// private async Task FinalizerTaskAsync() { - if (_dispatchCancellationSource is null) + if (_responderCancellationSource is null) { throw new InvalidOperationException(); } @@ -183,34 +181,39 @@ private async Task FinalizerTaskAsync() throw new InvalidOperationException(); } - while (!_dispatchCancellationSource.Token.IsCancellationRequested) + try { - var responderResults = await _respondersToFinalize.Reader.ReadAsync - ( - _dispatchCancellationSource.Token - ); - - if (!responderResults.IsCompleted) + while (await _respondersToFinalize.Reader.WaitToReadAsync()) { - var timeout = Task.Delay(TimeSpan.FromMilliseconds(10)); - - var finishedTask = await Task.WhenAny(responderResults, timeout); - if (finishedTask == timeout) + var responderResults = await _respondersToFinalize.Reader.ReadAsync(); + if (!responderResults.IsCompleted) { - // This responder is taking too long... put it back on the channel and look at some other stuff - // in the meantime. - await _respondersToFinalize.Writer.WriteAsync(responderResults, _dispatchCancellationSource.Token); - continue; + var timeout = Task.Delay(TimeSpan.FromMilliseconds(10)); + + var finishedTask = await Task.WhenAny(responderResults, timeout); + if (finishedTask == timeout) + { + // This responder is taking too long... put it back on the channel and look at some other stuff + // in the meantime. + try + { + await _respondersToFinalize.Writer.WriteAsync(responderResults); + continue; + } + catch (ChannelClosedException) + { + // Okay, we can't put it back on, so we'll just drop out and await it. It should be the last + // item in the pipe anyway + } + } } - } - FinalizeResponderDispatch(await responderResults); + FinalizeResponderDispatch(await responderResults); + } } - - await _respondersToFinalize.Reader.Completion; - await foreach (var responderResults in _respondersToFinalize.Reader.ReadAllAsync()) + catch (Exception ex) when (ex is OperationCanceledException or ChannelClosedException) { - FinalizeResponderDispatch(await responderResults); + // this is fine, nothing further to do } } @@ -218,8 +221,7 @@ private async Task FinalizerTaskAsync() /// Unwraps the given payload into its typed representation, dispatching all events for it. /// /// The payload. - /// The cancellation token for the dispatched event. - private Result>> UnwrapAndDispatchEvent(IPayload payload, CancellationToken ct = default) + private Result>> UnwrapAndDispatchEvent(IPayload payload) { var payloadType = payload.GetType(); @@ -269,15 +271,14 @@ private Result>> UnwrapAndDispatchEvent(IPayload payl throw new MissingMethodException(nameof(DiscordGatewayClient), nameof(DispatchEventAsync)); } - var delegateType = typeof(Func<,,>).MakeGenericType + var delegateType = typeof(Func<,>).MakeGenericType ( typeof(IPayload<>).MakeGenericType(interfaceArgument), - typeof(CancellationToken), typeof(Task>) ); // Naughty unsafe cast, because we know we're calling it with compatible types in this method - dispatchDelegate = Unsafe.As>>> + dispatchDelegate = Unsafe.As>>> ( dispatchMethod .MakeGenericMethod(interfaceArgument) @@ -287,7 +288,9 @@ private Result>> UnwrapAndDispatchEvent(IPayload payl _cachedDispatchDelegates.Add(interfaceArgument, dispatchDelegate); } - var responderTask = Task.Run(() => dispatchDelegate(payload, ct), ct); + // Don't use the cancellation token here; we want the task to always run and let the responder decide when to + // actually cancel + var responderTask = Task.Run(() => dispatchDelegate(payload), CancellationToken.None); return responderTask; } @@ -296,13 +299,8 @@ private Result>> UnwrapAndDispatchEvent(IPayload payl /// Dispatches the given event to all relevant gateway event responders. /// /// The event to dispatch. - /// The cancellation token to use. /// The gateway event. - private async Task> DispatchEventAsync - ( - IPayload gatewayEvent, - CancellationToken ct = default - ) + private async Task> DispatchEventAsync(IPayload gatewayEvent) where TGatewayEvent : IGatewayEvent { // Batch up the responders according to their groups @@ -329,7 +327,7 @@ private async Task> DispatchEventAsync var responder = (IResponder)serviceScope.ServiceProvider .GetRequiredService(rt); - return await responder.RespondAsync(gatewayEvent.Data, ct); + return await responder.RespondAsync(gatewayEvent.Data, _responderCancellationSource.Token); } catch (Exception e) { @@ -423,26 +421,15 @@ public async ValueTask DisposeAsync() GC.SuppressFinalize(this); - // Stop! - _dispatchCancellationSource.Cancel(); - _payloadsToDispatch.Writer.Complete(); + // Signal running responders that they should cancel + _responderCancellationSource.Cancel(); - // Wait for everything to actually stop... - try - { - await _dispatcher; - } - catch (OperationCanceledException) - { - } + // Prevent further payloads from being written, signalling the readers that they should terminate + _payloadsToDispatch.Writer.Complete(); - try - { - await _finalizer; - } - catch (OperationCanceledException) - { - } + // Wait for everything to actually stop + await _dispatcher; + await _finalizer; _isDisposed = true; }