From ca9439b4a49a393c00b91b9282050c33fb6d3a73 Mon Sep 17 00:00:00 2001 From: Kerry Jiang Date: Sat, 13 Apr 2024 22:33:24 -0700 Subject: [PATCH] added npackage handking cancellation token --- samples/CommandServer/ADD.cs | 3 +- samples/CommandServer/MULT.cs | 3 +- samples/CommandServer/SUB.cs | 3 +- .../CommandExecutingContext.cs | 3 ++ src/SuperSocket.Command/CommandMiddleware.cs | 28 ++++++++++--------- src/SuperSocket.Command/CommandWrap.cs | 5 ++-- src/SuperSocket.Command/ICommand.cs | 3 +- src/SuperSocket.Command/JsonCommand.cs | 9 +++--- .../DelegatePackageHandler.cs | 13 ++++++--- .../IPackageHandler.cs | 3 +- .../IPackageHandlingScheduler.cs | 3 +- .../ServerOptions.cs | 5 ++++ .../ConcurrentPackageHandlingScheduler.cs | 5 ++-- .../PackageHandlingSchedulerBase.cs | 7 +++-- .../SerialPackageHandlingScheduler.cs | 5 ++-- src/SuperSocket.Server/SuperSocketService.cs | 21 +++++++++++++- .../CommandSubProtocolHandler.cs | 5 ++-- .../DelegateSubProtocolHandler.cs | 13 +++++++-- .../SubProtocolHandlerBase.cs | 3 +- .../WebSocketPackageHandler.cs | 7 +++-- test/SuperSocket.Tests.Command/MIN.cs | 4 ++- test/SuperSocket.Tests.Command/SORT.cs | 3 +- test/SuperSocket.Tests/CommandTest.cs | 4 +-- test/SuperSocket.Tests/Commands.cs | 13 +++++---- .../PackageHandlingContextAccessorTest.cs | 5 ++-- .../SuperSocket.Tests/SessionContainerTest.cs | 2 +- .../WebSocket/WebSocketBasicTest.cs | 6 ++-- version.json | 2 +- 28 files changed, 122 insertions(+), 64 deletions(-) diff --git a/samples/CommandServer/ADD.cs b/samples/CommandServer/ADD.cs index 2609a44a8..09d9c0387 100644 --- a/samples/CommandServer/ADD.cs +++ b/samples/CommandServer/ADD.cs @@ -1,5 +1,6 @@ using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using SuperSocket; using SuperSocket.Server.Abstractions.Session; @@ -11,7 +12,7 @@ namespace CommandServer [Command("add")] public class ADD : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) diff --git a/samples/CommandServer/MULT.cs b/samples/CommandServer/MULT.cs index 1dc791516..3a6905e51 100644 --- a/samples/CommandServer/MULT.cs +++ b/samples/CommandServer/MULT.cs @@ -1,5 +1,6 @@ using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using SuperSocket; using SuperSocket.Server.Abstractions.Session; @@ -12,7 +13,7 @@ namespace CommandServer public class MULT : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) diff --git a/samples/CommandServer/SUB.cs b/samples/CommandServer/SUB.cs index a97ba1c4a..39f35460b 100644 --- a/samples/CommandServer/SUB.cs +++ b/samples/CommandServer/SUB.cs @@ -1,5 +1,6 @@ using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using SuperSocket; using SuperSocket.Server.Abstractions.Session; @@ -14,7 +15,7 @@ public class SUB : IAsyncCommand public string Name => Key; - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) diff --git a/src/SuperSocket.Command/CommandExecutingContext.cs b/src/SuperSocket.Command/CommandExecutingContext.cs index a8648a43a..fd8deb4e2 100644 --- a/src/SuperSocket.Command/CommandExecutingContext.cs +++ b/src/SuperSocket.Command/CommandExecutingContext.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using SuperSocket.Server.Abstractions.Session; namespace SuperSocket.Command @@ -27,5 +28,7 @@ public struct CommandExecutingContext /// The exception. /// public Exception Exception { get; set; } + + public CancellationToken CancellationToken { get; set; } } } diff --git a/src/SuperSocket.Command/CommandMiddleware.cs b/src/SuperSocket.Command/CommandMiddleware.cs index ca86bbbb9..9b90717e9 100644 --- a/src/SuperSocket.Command/CommandMiddleware.cs +++ b/src/SuperSocket.Command/CommandMiddleware.cs @@ -2,14 +2,15 @@ using System.Collections.Generic; using System.Linq; using System.Reflection; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Options; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; using SuperSocket.ProtoBase; using SuperSocket.Server.Abstractions; using SuperSocket.Server.Abstractions.Session; using SuperSocket.Server.Abstractions.Middleware; -using Microsoft.Extensions.Logging; namespace SuperSocket.Command { @@ -195,31 +196,31 @@ protected virtual IPackageMapper CreatePackageMap return serviceProvider.GetService>(); } - protected virtual async ValueTask HandlePackage(IAppSession session, TPackageInfo package) + protected virtual async ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { if (!_commands.TryGetValue(package.Key, out ICommandSet commandSet)) { return; } - await commandSet.ExecuteAsync(session, package); + await commandSet.ExecuteAsync(session, package, cancellationToken); } - protected virtual async Task OnPackageReceived(IAppSession session, TPackageInfo package) + protected virtual async Task OnPackageReceived(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { - await HandlePackage(session, package); + await HandlePackage(session, package, cancellationToken); } - ValueTask IPackageHandler.Handle(IAppSession session, TNetPackageInfo package) + ValueTask IPackageHandler.Handle(IAppSession session, TNetPackageInfo package, CancellationToken cancellationToken) { - return HandlePackage(session, PackageMapper.Map(package)); + return HandlePackage(session, PackageMapper.Map(package), cancellationToken); } interface ICommandSet { TKey Key { get; } - ValueTask ExecuteAsync(IAppSession session, TPackageInfo package); + ValueTask ExecuteAsync(IAppSession session, TPackageInfo package, CancellationToken cancellationToken); } class CommandTypeInfo @@ -392,11 +393,11 @@ public void Initialize(IServiceProvider serviceProvider, CommandTypeInfo command Filters = filters; } - public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { if (Filters.Count > 0) { - await ExecuteAsyncWithFilter(session, package); + await ExecuteAsyncWithFilter(session, package, cancellationToken); return; } @@ -406,18 +407,19 @@ public async ValueTask ExecuteAsync(IAppSession session, TPackageInfo package) if (asyncCommand != null) { - await asyncCommand.ExecuteAsync(appSession, package); + await asyncCommand.ExecuteAsync(appSession, package, cancellationToken); return; } Command.Execute(appSession, package); } - private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo package) + private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { var context = new CommandExecutingContext(); context.Package = package; context.Session = session; + context.CancellationToken = cancellationToken; var command = AsyncCommand != null ? (AsyncCommand as ICommand) : (Command as ICommand); @@ -457,7 +459,7 @@ private async ValueTask ExecuteAsyncWithFilter(IAppSession session, TPackageInfo if (asyncCommand != null) { - await asyncCommand.ExecuteAsync(appSession, package); + await asyncCommand.ExecuteAsync(appSession, package, cancellationToken); } else { diff --git a/src/SuperSocket.Command/CommandWrap.cs b/src/SuperSocket.Command/CommandWrap.cs index 798230176..d3b509c4d 100644 --- a/src/SuperSocket.Command/CommandWrap.cs +++ b/src/SuperSocket.Command/CommandWrap.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using SuperSocket.Server.Abstractions.Session; @@ -55,9 +56,9 @@ public AsyncCommandWrap(IServiceProvider serviceProvider) InnerCommand = (TAsyncCommand)ActivatorUtilities.CreateInstance(serviceProvider, typeof(TAsyncCommand)); } - public async ValueTask ExecuteAsync(TAppSession session, TPackageInfo package) + public async ValueTask ExecuteAsync(TAppSession session, TPackageInfo package, CancellationToken cancellationToken) { - await InnerCommand.ExecuteAsync(session, package); + await InnerCommand.ExecuteAsync(session, package, cancellationToken); } ICommand ICommandWrap.InnerCommand diff --git a/src/SuperSocket.Command/ICommand.cs b/src/SuperSocket.Command/ICommand.cs index b56a9a9b6..d80119f91 100644 --- a/src/SuperSocket.Command/ICommand.cs +++ b/src/SuperSocket.Command/ICommand.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -28,6 +29,6 @@ public interface IAsyncCommand : IAsyncCommand : ICommand where TAppSession : IAppSession { - ValueTask ExecuteAsync(TAppSession session, TPackageInfo package); + ValueTask ExecuteAsync(TAppSession session, TPackageInfo package, CancellationToken cancellationToken); } } diff --git a/src/SuperSocket.Command/JsonCommand.cs b/src/SuperSocket.Command/JsonCommand.cs index ad225f418..03dcbb5cf 100644 --- a/src/SuperSocket.Command/JsonCommand.cs +++ b/src/SuperSocket.Command/JsonCommand.cs @@ -1,4 +1,5 @@ -using System.Threading.Tasks; +using System.Threading; +using System.Threading.Tasks; using System.Text.Json; using SuperSocket.ProtoBase; using SuperSocket.Server.Abstractions.Session; @@ -55,10 +56,10 @@ public JsonAsyncCommand() }; } - public virtual async ValueTask ExecuteAsync(TAppSession session, IStringPackage package) + public virtual async ValueTask ExecuteAsync(TAppSession session, IStringPackage package, CancellationToken cancellationToken) { var content = package.Body; - await ExecuteJsonAsync(session, string.IsNullOrEmpty(content) ? default(TJsonObject) : Deserialize(content)); + await ExecuteJsonAsync(session, string.IsNullOrEmpty(content) ? default(TJsonObject) : Deserialize(content), cancellationToken); } protected virtual TJsonObject Deserialize(string content) @@ -66,6 +67,6 @@ protected virtual TJsonObject Deserialize(string content) return JsonSerializer.Deserialize(content, JsonSerializerOptions); } - protected abstract ValueTask ExecuteJsonAsync(TAppSession session, TJsonObject jsonObject); + protected abstract ValueTask ExecuteJsonAsync(TAppSession session, TJsonObject jsonObject, CancellationToken cancellationToken); } } diff --git a/src/SuperSocket.Server.Abstractions/DelegatePackageHandler.cs b/src/SuperSocket.Server.Abstractions/DelegatePackageHandler.cs index c1fb1398a..c145d3f0f 100644 --- a/src/SuperSocket.Server.Abstractions/DelegatePackageHandler.cs +++ b/src/SuperSocket.Server.Abstractions/DelegatePackageHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -6,17 +7,21 @@ namespace SuperSocket.Server.Abstractions { public class DelegatePackageHandler : IPackageHandler { - - Func _func; + Func _func; public DelegatePackageHandler(Func func) + { + _func = (session, package, cancellationToken) => func(session, package); + } + + public DelegatePackageHandler(Func func) { _func = func; } - public async ValueTask Handle(IAppSession session, TReceivePackageInfo package) + public async ValueTask Handle(IAppSession session, TReceivePackageInfo package, CancellationToken cancellationToken) { - await _func(session, package); + await _func(session, package, cancellationToken); } } } \ No newline at end of file diff --git a/src/SuperSocket.Server.Abstractions/IPackageHandler.cs b/src/SuperSocket.Server.Abstractions/IPackageHandler.cs index 50e868a3f..318ee5278 100644 --- a/src/SuperSocket.Server.Abstractions/IPackageHandler.cs +++ b/src/SuperSocket.Server.Abstractions/IPackageHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -6,6 +7,6 @@ namespace SuperSocket.Server.Abstractions { public interface IPackageHandler { - ValueTask Handle(IAppSession session, TReceivePackageInfo package); + ValueTask Handle(IAppSession session, TReceivePackageInfo package, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/SuperSocket.Server.Abstractions/IPackageHandlingScheduler.cs b/src/SuperSocket.Server.Abstractions/IPackageHandlingScheduler.cs index 4d223c3a4..f8469d6e4 100644 --- a/src/SuperSocket.Server.Abstractions/IPackageHandlingScheduler.cs +++ b/src/SuperSocket.Server.Abstractions/IPackageHandlingScheduler.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Connection; using SuperSocket.Server.Abstractions.Session; @@ -11,6 +12,6 @@ public interface IPackageHandlingScheduler { void Initialize(IPackageHandler packageHandler, Func, ValueTask> errorHandler); - ValueTask HandlePackage(IAppSession session, TPackageInfo package); + ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/SuperSocket.Server.Abstractions/ServerOptions.cs b/src/SuperSocket.Server.Abstractions/ServerOptions.cs index 564402507..5f6325595 100644 --- a/src/SuperSocket.Server.Abstractions/ServerOptions.cs +++ b/src/SuperSocket.Server.Abstractions/ServerOptions.cs @@ -15,5 +15,10 @@ public class ServerOptions : ConnectionOptions public int ClearIdleSessionInterval { get; set; } = 120; public int IdleSessionTimeOut { get; set; } = 300; + + /// + /// In seconds. + /// + public int PackageHandlingTimeOut { get; set; } = 30; } } \ No newline at end of file diff --git a/src/SuperSocket.Server/ConcurrentPackageHandlingScheduler.cs b/src/SuperSocket.Server/ConcurrentPackageHandlingScheduler.cs index 52d2f5fef..56fd17955 100644 --- a/src/SuperSocket.Server/ConcurrentPackageHandlingScheduler.cs +++ b/src/SuperSocket.Server/ConcurrentPackageHandlingScheduler.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -8,9 +9,9 @@ namespace SuperSocket.Server { public class ConcurrentPackageHandlingScheduler : PackageHandlingSchedulerBase { - public override ValueTask HandlePackage(IAppSession session, TPackageInfo package) + public override ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { - HandlePackageInternal(session, package).DoNotAwait(); + HandlePackageInternal(session, package, cancellationToken).DoNotAwait(); return new ValueTask(); } } diff --git a/src/SuperSocket.Server/PackageHandlingSchedulerBase.cs b/src/SuperSocket.Server/PackageHandlingSchedulerBase.cs index a7ff262ee..1cb0d7962 100644 --- a/src/SuperSocket.Server/PackageHandlingSchedulerBase.cs +++ b/src/SuperSocket.Server/PackageHandlingSchedulerBase.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Connection; using SuperSocket.Server.Abstractions; @@ -14,7 +15,7 @@ public abstract class PackageHandlingSchedulerBase : IPackageHandl public Func, ValueTask> ErrorHandler { get; private set; } - public abstract ValueTask HandlePackage(IAppSession session, TPackageInfo package); + public abstract ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken); public virtual void Initialize(IPackageHandler packageHandler, Func, ValueTask> errorHandler) { @@ -22,7 +23,7 @@ public virtual void Initialize(IPackageHandler packageHandler, Fun ErrorHandler = errorHandler; } - protected async ValueTask HandlePackageInternal(IAppSession session, TPackageInfo package) + protected async ValueTask HandlePackageInternal(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { var packageHandler = PackageHandler; var errorHandler = ErrorHandler; @@ -30,7 +31,7 @@ protected async ValueTask HandlePackageInternal(IAppSession session, TPackageInf try { if (packageHandler != null) - await packageHandler.Handle(session, package); + await packageHandler.Handle(session, package, cancellationToken); } catch (Exception e) { diff --git a/src/SuperSocket.Server/SerialPackageHandlingScheduler.cs b/src/SuperSocket.Server/SerialPackageHandlingScheduler.cs index 3f26a4c66..f5553b7de 100644 --- a/src/SuperSocket.Server/SerialPackageHandlingScheduler.cs +++ b/src/SuperSocket.Server/SerialPackageHandlingScheduler.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -8,9 +9,9 @@ namespace SuperSocket.Server { public class SerialPackageHandlingScheduler : PackageHandlingSchedulerBase { - public override async ValueTask HandlePackage(IAppSession session, TPackageInfo package) + public override async ValueTask HandlePackage(IAppSession session, TPackageInfo package, CancellationToken cancellationToken) { - await HandlePackageInternal(session, package); + await HandlePackageInternal(session, package, cancellationToken); } } } \ No newline at end of file diff --git a/src/SuperSocket.Server/SuperSocketService.cs b/src/SuperSocket.Server/SuperSocketService.cs index 5f89dca15..17787e3db 100644 --- a/src/SuperSocket.Server/SuperSocketService.cs +++ b/src/SuperSocket.Server/SuperSocketService.cs @@ -391,13 +391,25 @@ private async ValueTask HandleSession(AppSession session, IConnection connection var packageHandlingScheduler = _packageHandlingScheduler; +#if NET6_0_OR_GREATER + using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None); +#endif + await foreach (var p in packageStream) { if(_packageHandlingContextAccessor != null) { _packageHandlingContextAccessor.PackageHandlingContext = new PackageHandlingContext(session, p); } - await packageHandlingScheduler.HandlePackage(session, p); + +#if !NET6_0_OR_GREATER + using var cancellationTokenSource = GetPackageHandlingCancellationTokenSource(CancellationToken.None); +#endif + await packageHandlingScheduler.HandlePackage(session, p, cancellationTokenSource.Token); + +#if NET6_0_OR_GREATER + cancellationTokenSource.TryReset(); +#endif } } catch (Exception e) @@ -406,6 +418,13 @@ private async ValueTask HandleSession(AppSession session, IConnection connection } } + protected virtual CancellationTokenSource GetPackageHandlingCancellationTokenSource(CancellationToken cancellationToken) + { + var cancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cancellationTokenSource.CancelAfter(TimeSpan.FromSeconds(Options.PackageHandlingTimeOut)); + return cancellationTokenSource; + } + protected virtual ValueTask OnSessionErrorAsync(IAppSession session, PackageHandlingException exception) { _logger.LogError(exception, $"Session[{session.SessionID}]: session exception."); diff --git a/src/SuperSocket.WebSocket.Server/CommandSubProtocolHandler.cs b/src/SuperSocket.WebSocket.Server/CommandSubProtocolHandler.cs index e511c54ab..77c7c061b 100644 --- a/src/SuperSocket.WebSocket.Server/CommandSubProtocolHandler.cs +++ b/src/SuperSocket.WebSocket.Server/CommandSubProtocolHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Options; using SuperSocket.Command; @@ -20,9 +21,9 @@ public CommandSubProtocolHandler(string name, IServiceProvider serviceProvider, _commandMiddleware = Activator.CreateInstance(commandMiddlewareType, serviceProvider, commandOptions, mapper) as IPackageHandler; } - public override async ValueTask Handle(IAppSession session, WebSocketPackage package) + public override async ValueTask Handle(IAppSession session, WebSocketPackage package, CancellationToken cancellationToken) { - await _commandMiddleware.Handle(session, package); + await _commandMiddleware.Handle(session, package, cancellationToken); } } } \ No newline at end of file diff --git a/src/SuperSocket.WebSocket.Server/DelegateSubProtocolHandler.cs b/src/SuperSocket.WebSocket.Server/DelegateSubProtocolHandler.cs index b1c658645..09c8481fc 100644 --- a/src/SuperSocket.WebSocket.Server/DelegateSubProtocolHandler.cs +++ b/src/SuperSocket.WebSocket.Server/DelegateSubProtocolHandler.cs @@ -1,4 +1,5 @@ using System; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Server.Abstractions.Session; @@ -6,17 +7,23 @@ namespace SuperSocket.WebSocket.Server { class DelegateSubProtocolHandler : SubProtocolHandlerBase { - private Func _packageHandler; + private Func _packageHandler; public DelegateSubProtocolHandler(string name, Func packageHandler) : base(name) + { + _packageHandler = (session, package, cancellationToken) => packageHandler(session, package); + } + + public DelegateSubProtocolHandler(string name, Func packageHandler) + : base(name) { _packageHandler = packageHandler; } - public override async ValueTask Handle(IAppSession session, WebSocketPackage package) + public override async ValueTask Handle(IAppSession session, WebSocketPackage package, CancellationToken cancellationToken) { - await _packageHandler(session as WebSocketSession, package); + await _packageHandler(session as WebSocketSession, package, cancellationToken); } } } \ No newline at end of file diff --git a/src/SuperSocket.WebSocket.Server/SubProtocolHandlerBase.cs b/src/SuperSocket.WebSocket.Server/SubProtocolHandlerBase.cs index 368479d04..08a016b0a 100644 --- a/src/SuperSocket.WebSocket.Server/SubProtocolHandlerBase.cs +++ b/src/SuperSocket.WebSocket.Server/SubProtocolHandlerBase.cs @@ -1,5 +1,6 @@ using System; using System.Linq; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -16,6 +17,6 @@ public SubProtocolHandlerBase(string name) Name = name; } - public abstract ValueTask Handle(IAppSession session, WebSocketPackage package); + public abstract ValueTask Handle(IAppSession session, WebSocketPackage package, CancellationToken cancellationToken); } } \ No newline at end of file diff --git a/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs b/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs index 9d829e851..1f0966566 100644 --- a/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs +++ b/src/SuperSocket.WebSocket.Server/WebSocketPackageHandler.cs @@ -6,6 +6,7 @@ using System.Linq; using System.Security.Cryptography; using System.Text; +using System.Threading; using System.Threading.Tasks; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; @@ -90,7 +91,7 @@ private CloseStatus GetCloseStatusFromPackage(WebSocketPackage package) return closeStatus; } - public async ValueTask Handle(IAppSession session, WebSocketPackage package) + public async ValueTask Handle(IAppSession session, WebSocketPackage package, CancellationToken cancellationToken) { var websocketSession = session as WebSocketSession; @@ -158,7 +159,7 @@ public async ValueTask Handle(IAppSession session, WebSocketPackage package) if (protocolHandler != null) { - await protocolHandler.Handle(session, package); + await protocolHandler.Handle(session, package, cancellationToken); return; } @@ -167,7 +168,7 @@ public async ValueTask Handle(IAppSession session, WebSocketPackage package) if (websocketCommandMiddleware != null) { - await websocketCommandMiddleware.Handle(session, package); + await websocketCommandMiddleware.Handle(session, package, cancellationToken); return; } diff --git a/test/SuperSocket.Tests.Command/MIN.cs b/test/SuperSocket.Tests.Command/MIN.cs index 11946546a..0637027fe 100644 --- a/test/SuperSocket.Tests.Command/MIN.cs +++ b/test/SuperSocket.Tests.Command/MIN.cs @@ -1,7 +1,9 @@ using System; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; +using Microsoft.Extensions.Primitives; using SuperSocket.Command; using SuperSocket.ProtoBase; using SuperSocket.Server.Abstractions.Session; @@ -10,7 +12,7 @@ namespace SuperSocket.Tests.Command { public class MIN : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)).OrderBy(x => x).FirstOrDefault(); diff --git a/test/SuperSocket.Tests.Command/SORT.cs b/test/SuperSocket.Tests.Command/SORT.cs index 84b62c5ea..d56305c9b 100644 --- a/test/SuperSocket.Tests.Command/SORT.cs +++ b/test/SuperSocket.Tests.Command/SORT.cs @@ -1,6 +1,7 @@ using System; using System.Linq; using System.Text; +using System.Threading; using System.Threading.Tasks; using SuperSocket.Command; using SuperSocket.ProtoBase; @@ -10,7 +11,7 @@ namespace SuperSocket.Tests.Command { public class SORT : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = string.Join(' ', package.Parameters .Select(p => int.Parse(p)).OrderBy(x => x).Select(x => x.ToString())); diff --git a/test/SuperSocket.Tests/CommandTest.cs b/test/SuperSocket.Tests/CommandTest.cs index 0848b2b47..5e40d2ef5 100644 --- a/test/SuperSocket.Tests/CommandTest.cs +++ b/test/SuperSocket.Tests/CommandTest.cs @@ -331,7 +331,7 @@ public COUNT(IPackageEncoder encoder) _encoder = encoder; } - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { await session.SendAsync(_encoder, "OK\r\n"); } @@ -347,7 +347,7 @@ public COUNTDOWN(IPackageEncoder encoder) _encoder = encoder; } - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { await session.SendAsync(_encoder, "OK\r\n"); } diff --git a/test/SuperSocket.Tests/Commands.cs b/test/SuperSocket.Tests/Commands.cs index 7042ab890..18e130956 100644 --- a/test/SuperSocket.Tests/Commands.cs +++ b/test/SuperSocket.Tests/Commands.cs @@ -8,6 +8,7 @@ using SuperSocket.Command; using SuperSocket.ProtoBase; using SuperSocket.Tests.Command; +using System.Threading; namespace SuperSocket.Tests @@ -19,7 +20,7 @@ public class MySession : AppSession public class ADD : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) @@ -31,7 +32,7 @@ public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo packa public class MULT : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) @@ -50,7 +51,7 @@ public SUB(IPackageEncoder encoder) _encoder = encoder; } - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) @@ -70,7 +71,7 @@ public DIV(IPackageEncoder encoder) _encoder = encoder; } - public async ValueTask ExecuteAsync(MySession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(MySession session, StringPackageInfo package, CancellationToken cancellationToken) { var values = package .Parameters @@ -93,7 +94,7 @@ public class PowData public class POW : JsonAsyncCommand { - protected override async ValueTask ExecuteJsonAsync(IAppSession session, PowData data) + protected override async ValueTask ExecuteJsonAsync(IAppSession session, PowData data, CancellationToken cancellationToken) { await session.SendAsync(Encoding.UTF8.GetBytes($"{Math.Pow(data.X, data.Y)}\r\n")); } @@ -106,7 +107,7 @@ public class MaxData public class MAX : JsonAsyncCommand { - protected override async ValueTask ExecuteJsonAsync(IAppSession session, MaxData data) + protected override async ValueTask ExecuteJsonAsync(IAppSession session, MaxData data, CancellationToken cancellationToken) { var maxValue = data.Numbers.OrderByDescending(i => i).FirstOrDefault(); await session.SendAsync(Encoding.UTF8.GetBytes($"{maxValue}\r\n")); diff --git a/test/SuperSocket.Tests/PackageHandlingContextAccessorTest.cs b/test/SuperSocket.Tests/PackageHandlingContextAccessorTest.cs index 69d6fe2c1..42133f63e 100644 --- a/test/SuperSocket.Tests/PackageHandlingContextAccessorTest.cs +++ b/test/SuperSocket.Tests/PackageHandlingContextAccessorTest.cs @@ -14,6 +14,7 @@ using System.Threading.Tasks; using Xunit; using Xunit.Abstractions; +using System.Threading; namespace SuperSocket.Tests { @@ -82,7 +83,7 @@ public TestCommand(IServiceProvider serviceProvider) } - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { var packageHandlingContextAccessor = serviceProvider.GetService>(); if (packageHandlingContextAccessor != null) @@ -97,6 +98,4 @@ public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo packa } } } - - } diff --git a/test/SuperSocket.Tests/SessionContainerTest.cs b/test/SuperSocket.Tests/SessionContainerTest.cs index 22eeb785c..52c4c57bd 100644 --- a/test/SuperSocket.Tests/SessionContainerTest.cs +++ b/test/SuperSocket.Tests/SessionContainerTest.cs @@ -23,7 +23,7 @@ public class SessionContainerTest : TestClassBase { class SESS : IAsyncCommand { - public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(IAppSession session, StringPackageInfo package, CancellationToken cancellationToken) { await session.SendAsync(Encoding.UTF8.GetBytes(session.SessionID + "\r\n")); } diff --git a/test/SuperSocket.Tests/WebSocket/WebSocketBasicTest.cs b/test/SuperSocket.Tests/WebSocket/WebSocketBasicTest.cs index 4738b91cc..b12ac87ae 100644 --- a/test/SuperSocket.Tests/WebSocket/WebSocketBasicTest.cs +++ b/test/SuperSocket.Tests/WebSocket/WebSocketBasicTest.cs @@ -1075,7 +1075,7 @@ private async ValueTask GetWebSocketReply(ClientWebSocket websocket, byt class ADD : IAsyncCommand { - public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) @@ -1087,7 +1087,7 @@ public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo class MULT : IAsyncCommand { - public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) @@ -1099,7 +1099,7 @@ public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo class SUB : IAsyncCommand { - public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package) + public async ValueTask ExecuteAsync(WebSocketSession session, StringPackageInfo package, CancellationToken cancellationToken) { var result = package.Parameters .Select(p => int.Parse(p)) diff --git a/version.json b/version.json index c1e090c72..56e6535c6 100644 --- a/version.json +++ b/version.json @@ -1,6 +1,6 @@ { "$schema": "https://raw.githubusercontent.com/dotnet/Nerdbank.GitVersioning/master/src/NerdBank.GitVersioning/version.schema.json", - "version": "2.0.0-beta.20", + "version": "2.0.0-beta.21", "publicReleaseRefSpec": [ "^refs/heads/master$", "^refs/heads/v\\d+(?:\\.\\d+)?$"