From 2b25fcb11a31417b721b655f1407620068d62f74 Mon Sep 17 00:00:00 2001 From: bchavez Date: Tue, 17 Sep 2019 08:42:02 -0700 Subject: [PATCH] Fixes #146 - Better support for `CancellationToken`. Clean up of token registrations that may produce exceptions when `CancellationTokenSource.Cancel()` is called. Basic idea here is to have the caller of `CancellationTokenSource.Cancel()` dispose of the `CancellationTokenRegistration` and have the message pump worker tasked with finishing the Awatier task continuations dispose of `CancellationTokenRegistration` when it's done via using statement. --- HISTORY.md | 3 + .../GitHubIssues/Issue146.cs | 74 +++++++++++++++++++ .../RethinkDb.Driver.Tests.csproj | 1 + Source/RethinkDb.Driver/Net/SocketWrapper.cs | 14 +++- .../RethinkDb.Driver/Utils/CancellableTask.cs | 15 ++-- 5 files changed, 96 insertions(+), 11 deletions(-) create mode 100644 Source/RethinkDb.Driver.Tests/GitHubIssues/Issue146.cs diff --git a/HISTORY.md b/HISTORY.md index 363a883..417892a 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,3 +1,6 @@ +## v2.3.150 +* Issue #146 - Better support for `CancellationToken`. Clean up of token registrations that may produce exceptions when `CancellationTokenSource.Cancel()` is called. + ## v2.3.101 * Issue #143 - `ConnectionPool.Builder.ConnectAsync()` now respects `.InitialTimeout()` parameter. `CancellationToken` also supported. diff --git a/Source/RethinkDb.Driver.Tests/GitHubIssues/Issue146.cs b/Source/RethinkDb.Driver.Tests/GitHubIssues/Issue146.cs new file mode 100644 index 0000000..ee257f2 --- /dev/null +++ b/Source/RethinkDb.Driver.Tests/GitHubIssues/Issue146.cs @@ -0,0 +1,74 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Newtonsoft.Json.Linq; +using NUnit.Framework; +using RethinkDb.Driver.Net; +using static System.Console; +using static RethinkDb.Driver.RethinkDB; + +namespace RethinkDb.Driver.Tests.GitHubIssues +{ + [TestFixture] + public class Issue146 + { + private Connection conn; + + [Test] + public async Task Test() + { + Print("Main Thread Start"); + + conn = R.Connection().Connect(); + + var cts = new CancellationTokenSource(); + + RunChanges(cts.Token); + + Print("Starting main thread delay."); + await Task.Delay(500); + + Print($"Canceling task"); + Action act = () => cts.Cancel(); + + act.ShouldNotThrow(); + + Print("End of main"); + } + + private async void RunChanges(CancellationToken ct) + { + Print("RunChanges: called"); + Cursor> changes = null; + try + { + Print("RunChanges: BEFORE Query"); + changes = await R.Db("rethinkdb").Table("jobs") + .Changes().OptArg("include_initial", "true") + .RunChangesAsync(conn, ct); + Print("RunChanges: Have Cursor, iterating with MoveNextAsync."); + while (await changes.MoveNextAsync(ct)) + { + Print("RunChanges: got a change"); + } + } + catch (OperationCanceledException ex) + { + Print("RunChanges: op canceled"); + } + finally + { + Print("RunChanges: finally"); + changes?.Close(); + Print("RunChanges: changes cursor closed"); + } + Print("RunChanges: returning"); + } + + static void Print(string msg) + { + WriteLine($">>> (TID:{Thread.CurrentThread.ManagedThreadId}): {msg}"); + } + } +} \ No newline at end of file diff --git a/Source/RethinkDb.Driver.Tests/RethinkDb.Driver.Tests.csproj b/Source/RethinkDb.Driver.Tests/RethinkDb.Driver.Tests.csproj index 9c33c22..a07caf2 100644 --- a/Source/RethinkDb.Driver.Tests/RethinkDb.Driver.Tests.csproj +++ b/Source/RethinkDb.Driver.Tests/RethinkDb.Driver.Tests.csproj @@ -122,6 +122,7 @@ + diff --git a/Source/RethinkDb.Driver/Net/SocketWrapper.cs b/Source/RethinkDb.Driver/Net/SocketWrapper.cs index 89252b7..cd6b9d5 100644 --- a/Source/RethinkDb.Driver/Net/SocketWrapper.cs +++ b/Source/RethinkDb.Driver/Net/SocketWrapper.cs @@ -233,11 +233,17 @@ private void ResponsePump() { Task.Run(() => { - //try, because it's possible - //the awaiting task was canceled. - if( !awaitingTask.TrySetResult(response) ) + //regardless of the outcome, clean up any registered + //cancellation tokens with using statement. + using ( awaitingTask ) { - Log.Debug($"Response Pump: The awaiter waiting for response token {response.Token} could not be set. The task was probably canceled."); + //try setting the result, because it's possible + //the awaiting task was canceled. + if( !awaitingTask.TrySetResult(response) ) + { + Log.Debug( + $"Response Pump: The awaiter waiting for response token {response.Token} could not be set. The task was probably canceled."); + } } }); } diff --git a/Source/RethinkDb.Driver/Utils/CancellableTask.cs b/Source/RethinkDb.Driver/Utils/CancellableTask.cs index 15dadf7..ee5efbe 100644 --- a/Source/RethinkDb.Driver/Utils/CancellableTask.cs +++ b/Source/RethinkDb.Driver/Utils/CancellableTask.cs @@ -21,17 +21,18 @@ public CancellableTask(CancellationToken cancelToken) private void OnCancellation() { - this.SetCanceled(); - } + this.TrySetCanceled(); - private bool disposed = false; + //if the user successfully signaled they want to + //cancel, remove the registration because + //the task status = canceled has been set. + //don't need the registration any more. + this.Dispose(); + } public void Dispose() { - if( !disposed ) - { - this.registration.Dispose(); - } + this.registration.Dispose(); } } } \ No newline at end of file