Skip to content

Commit

Permalink
Add cancellation tests (#218)
Browse files Browse the repository at this point in the history
* Don't log cancelling

* redo exception handling for receive

* remove null test case

* clean up with Id/Json and more cancels

* Change the exception stacks

* fix serialization test

* make a custom scrubber for internalized exceptions

* clean up

* fix namespaces again :(

* adjust the scrubber

* try to make tests more predictable

* rework exceptions again

* strip out compile files used

* formatting

* custom exception validation

* fix init

* Move serialization to own class

* save serialize test

* add deep clean

* add cancellation test on save to cache

* cancellation tests

* format

* do DI correctly

* receive cancel works
  • Loading branch information
adamhathcock authored Jan 30, 2025
1 parent 73afa28 commit 3aa993c
Show file tree
Hide file tree
Showing 52 changed files with 1,030 additions and 358 deletions.
32 changes: 32 additions & 0 deletions Speckle.Sdk.Testing/Framework/AggregationExceptionScrubber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
using Speckle.Sdk.Common;

namespace Speckle.Sdk.Testing.Framework;

public class AggregationExceptionScrubber : WriteOnlyJsonConverter<AggregateException>
{
private static readonly ExceptionScrubber _innerScrubber = new();

public override void Write(VerifyJsonWriter writer, AggregateException exception)
{
writer.WriteStartObject();

writer.WriteMember(exception, exception.GetType().FullName, "Type");
if (exception.InnerExceptions.Count == 1)
{
writer.WritePropertyName("InnerException");
_innerScrubber.Write(writer, exception.InnerException.NotNull());
}
else
{
writer.WritePropertyName("InnerExceptions");
writer.WriteStartArray();
foreach (var innerException in exception.InnerExceptions)
{
_innerScrubber.Write(writer, innerException);
}
writer.WriteEndArray();
}

writer.WriteEndObject();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

namespace Speckle.Sdk.Testing.Framework;

public class DummyReceiveServerObjectManager(Dictionary<string, string> objects) : IServerObjectManager
public class DummyReceiveServerObjectManager(IReadOnlyDictionary<string, string> objects) : IServerObjectManager
{
public async IAsyncEnumerable<(string, string)> DownloadObjects(
IReadOnlyCollection<string> objectIds,
Expand Down
3 changes: 2 additions & 1 deletion Speckle.Sdk.Testing/Framework/DummySqLiteReceiveManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@

namespace Speckle.Sdk.Testing.Framework;

public sealed class DummySqLiteReceiveManager(Dictionary<string, string> savedObjects) : ISqLiteJsonCacheManager
public sealed class DummySqLiteReceiveManager(IReadOnlyDictionary<string, string> savedObjects)
: ISqLiteJsonCacheManager
{
public void Dispose() { }

Expand Down
4 changes: 2 additions & 2 deletions Speckle.Sdk.Testing/Framework/DummySqLiteSendManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

namespace Speckle.Sdk.Testing.Framework;

public sealed class DummySqLiteSendManager : ISqLiteJsonCacheManager
public class DummySqLiteSendManager : ISqLiteJsonCacheManager
{
public string? GetObject(string id) => throw new NotImplementedException();

public void SaveObject(string id, string json) => throw new NotImplementedException();

public void UpdateObject(string id, string json) => throw new NotImplementedException();

public void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();
public virtual void SaveObjects(IEnumerable<(string id, string json)> items) => throw new NotImplementedException();

public bool HasObject(string objectId) => throw new NotImplementedException();

Expand Down
24 changes: 24 additions & 0 deletions Speckle.Sdk.Testing/Framework/ExceptionScrubber.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using Argon;

namespace Speckle.Sdk.Testing.Framework;

public class ExceptionScrubber : WriteOnlyJsonConverter<Exception>
{
public ExceptionScrubber() { }

public override void Write(VerifyJsonWriter writer, Exception value)
{
if (value.StackTrace != null)
{
var ex = new JObject
{
["Type"] = value.GetType().FullName,
["Message"] = value.Message,
["Source"] = value.Source?.Trim(),
};
writer.WriteRawValue(ex.ToString(Formatting.Indented));
return;
}
base.Write(writer, value.ToString());
}
}
18 changes: 15 additions & 3 deletions Speckle.Sdk.Testing/SpeckleVerify.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,32 @@ namespace Speckle.Sdk.Testing;

public static class SpeckleVerify
{
private static bool _initialized;

[ModuleInitializer]
public static void Initialize()
{
if (_initialized)
{
return;
}

_initialized = true;
VerifierSettings.DontScrubGuids();
VerifierSettings.DontScrubDateTimes();

VerifierSettings.UseStrictJson();
VerifierSettings.DontIgnoreEmptyCollections();
VerifierSettings.SortPropertiesAlphabetically();
VerifierSettings.SortJsonObjects();
if (!VerifyQuibble.Initialized)
VerifierSettings.AddExtraSettings(x =>
{
VerifyQuibble.Initialize();
}
var existing = x.Converters.OfType<WriteOnlyJsonConverter<AggregateException>>().First();
x.Converters.Remove(existing);
x.Converters.Add(new AggregationExceptionScrubber());
x.Converters.Add(new ExceptionScrubber());
});
VerifyQuibble.Initialize();
}

private static readonly JsonSerializer _jsonSerializer = new()
Expand Down
28 changes: 28 additions & 0 deletions build/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
const string PACK_LOCAL = "pack-local";
const string CLEAN_LOCKS = "clean-locks";
const string PERF = "perf";
const string DEEP_CLEAN = "deep-clean";

Target(
CLEAN_LOCKS,
Expand Down Expand Up @@ -124,6 +125,33 @@ void CheckBuildDirectory(string dir, string build)
}
);

Target(
DEEP_CLEAN,
() =>
{
foreach (var f in Glob.Directories(".", "**/bin"))
{
if (f.StartsWith("build"))
{
continue;
}
Console.WriteLine("Found and will delete: " + f);
Directory.Delete(f, true);
}
foreach (var f in Glob.Directories(".", "**/obj"))
{
if (f.StartsWith("Build"))
{
continue;
}
Console.WriteLine("Found and will delete: " + f);
Directory.Delete(f, true);
}
Console.WriteLine("Running restore now.");
Run("dotnet", "restore .\\Speckle.Sdk.sln --no-cache");
}
);

static Task RunPack() => RunAsync("dotnet", "pack Speckle.Sdk.sln -c Release -o output --no-build");

Target(PACK, DependsOn(TEST), RunPack);
Expand Down
4 changes: 3 additions & 1 deletion src/Speckle.Sdk.Dependencies/Collections.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ public static class Collections
{
public static IReadOnlyCollection<T> Freeze<T>(this IEnumerable<T> source) => source.ToFrozenSet();

public static IReadOnlyDictionary<TKey, TValue> Freeze<TKey, TValue>(this IDictionary<TKey, TValue> source)
public static IReadOnlyDictionary<TKey, TValue> Freeze<TKey, TValue>(
this IEnumerable<KeyValuePair<TKey, TValue>> source
)
where TKey : notnull => source.ToFrozenDictionary();
}

Expand Down
76 changes: 68 additions & 8 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelLoader.cs
Original file line number Diff line number Diff line change
@@ -1,30 +1,90 @@
using Open.ChannelExtensions;
using System.Threading.Channels;
using Open.ChannelExtensions;

namespace Speckle.Sdk.Dependencies.Serialization;

public abstract class ChannelLoader<T>
{
private const int RECEIVE_CAPACITY = 5000;

private const int HTTP_GET_CHUNK_SIZE = 500;
private const int MAX_PARALLELISM_HTTP = 4;
private static readonly TimeSpan HTTP_BATCH_TIMEOUT = TimeSpan.FromSeconds(2);
private static readonly int MAX_READ_CACHE_PARALLELISM = Environment.ProcessorCount;
private const int MAX_SAVE_CACHE_BATCH = 500;
private const int MAX_SAVE_CACHE_PARALLELISM = 4;

protected async Task GetAndCache(IEnumerable<string> allChildrenIds, CancellationToken cancellationToken) =>
await allChildrenIds
.ToChannel(cancellationToken: cancellationToken)
.Pipe(MAX_READ_CACHE_PARALLELISM, CheckCache, cancellationToken: cancellationToken)
private readonly List<Exception> _exceptions = new();
private readonly Channel<string> _channel = Channel.CreateBounded<string>(
new BoundedChannelOptions(RECEIVE_CAPACITY)
{
AllowSynchronousContinuations = true,
Capacity = RECEIVE_CAPACITY,
SingleWriter = false,
SingleReader = false,
FullMode = BoundedChannelFullMode.Wait,
},
_ => throw new NotImplementedException("Dropping items not supported.")
);

protected async Task GetAndCache(
IEnumerable<string> allChildrenIds,
CancellationToken cancellationToken,
int? maxParallelism = null
) =>
await _channel
.Source(allChildrenIds, cancellationToken)
.Pipe(maxParallelism ?? Environment.ProcessorCount, CheckCache, cancellationToken: cancellationToken)
.Filter(x => x is not null)
.Batch(HTTP_GET_CHUNK_SIZE)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(MAX_PARALLELISM_HTTP, async x => await Download(x).ConfigureAwait(false), -1, false, cancellationToken)
.PipeAsync(
maxParallelism ?? MAX_PARALLELISM_HTTP,
async x => await Download(x).ConfigureAwait(false),
-1,
false,
cancellationToken
)
.Join()
.Batch(MAX_SAVE_CACHE_BATCH)
.WithTimeout(HTTP_BATCH_TIMEOUT)
.ReadAllConcurrently(MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ReadAllConcurrently(maxParallelism ?? MAX_SAVE_CACHE_PARALLELISM, SaveToCache, cancellationToken)
.ContinueWith(
t =>
{
Exception? ex = t.Exception;
if (ex is null && t.Status is TaskStatus.Canceled && !cancellationToken.IsCancellationRequested)
{
ex = new OperationCanceledException();
}

if (ex is not null)
{
if (ex is AggregateException ae)
{
_exceptions.AddRange(ae.Flatten().InnerExceptions);
}
else
{
_exceptions.Add(ex);
}
}

_channel.Writer.TryComplete(ex);
},
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Current
)
.ConfigureAwait(false);

public void CheckForExceptions()
{
if (_exceptions.Count > 0)
{
throw new AggregateException(_exceptions);
}
}

public abstract string? CheckCache(string id);

public abstract Task<List<T>> Download(List<string?> ids);
Expand Down
20 changes: 10 additions & 10 deletions src/Speckle.Sdk.Dependencies/Serialization/ChannelSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public abstract class ChannelSaver<T>
private const int MAX_CACHE_WRITE_PARALLELISM = 4;
private const int MAX_CACHE_BATCH = 500;

private readonly List<Exception> _lists = new();
private readonly List<Exception> _exceptions = new();
private readonly Channel<T> _checkCacheChannel = Channel.CreateBounded<T>(
new BoundedChannelOptions(SEND_CAPACITY)
{
Expand All @@ -35,7 +35,7 @@ public Task Start(CancellationToken cancellationToken) =>
.WithTimeout(HTTP_BATCH_TIMEOUT)
.PipeAsync(
MAX_PARALLELISM_HTTP,
async x => await SendToServer(x, cancellationToken).ConfigureAwait(false),
async x => await SendToServer(x).ConfigureAwait(false),
HTTP_CAPACITY,
false,
cancellationToken
Expand All @@ -55,9 +55,9 @@ public Task Start(CancellationToken cancellationToken) =>

if (ex is not null)
{
lock (_lists)
lock (_exceptions)
{
_lists.Add(ex);
_exceptions.Add(ex);
}
}
_checkCacheChannel.Writer.TryComplete(ex);
Expand All @@ -70,25 +70,25 @@ public Task Start(CancellationToken cancellationToken) =>
public async ValueTask Save(T item, CancellationToken cancellationToken) =>
await _checkCacheChannel.Writer.WriteAsync(item, cancellationToken).ConfigureAwait(true);

public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch, CancellationToken cancellationToken)
public async Task<IMemoryOwner<T>> SendToServer(IMemoryOwner<T> batch)
{
await SendToServer((Batch<T>)batch, cancellationToken).ConfigureAwait(false);
await SendToServer((Batch<T>)batch).ConfigureAwait(false);
return batch;
}

public abstract Task SendToServer(Batch<T> batch, CancellationToken cancellationToken);
public abstract Task SendToServer(Batch<T> batch);

public void DoneTraversing() => _checkCacheChannel.Writer.TryComplete();

public async Task DoneSaving()
{
await _checkCacheChannel.Reader.Completion.ConfigureAwait(true);
lock (_lists)
lock (_exceptions)
{
if (_lists.Count > 0)
if (_exceptions.Count > 0)
{
var exceptions = new List<Exception>();
foreach (var ex in _lists)
foreach (var ex in _exceptions)
{
if (ex is AggregateException ae)
{
Expand Down
11 changes: 6 additions & 5 deletions src/Speckle.Sdk/Api/Operations/Operations.Receive.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ public async Task<Base> Receive2(
Uri url,
string streamId,
string objectId,
string? authorizationToken = null,
IProgress<ProgressArgs>? onProgressAction = null,
CancellationToken cancellationToken = default
string? authorizationToken,
IProgress<ProgressArgs>? onProgressAction,
CancellationToken cancellationToken
)
{
using var receiveActivity = activityFactory.Start("Operations.Receive");
Expand All @@ -28,9 +28,10 @@ public async Task<Base> Receive2(
url,
streamId,
authorizationToken,
onProgressAction
onProgressAction,
cancellationToken
);
var result = await process.Deserialize(objectId, cancellationToken).ConfigureAwait(false);
var result = await process.Deserialize(objectId).ConfigureAwait(false);
receiveActivity?.SetStatus(SdkActivityStatusCode.Ok);
return result;
}
Expand Down
2 changes: 1 addition & 1 deletion src/Speckle.Sdk/Serialisation/SpeckleObjectDeserializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ public async ValueTask<Base> DeserializeAsync([NotNull] string? rootObjectJson)
if (propName == "__closure")
{
reader.Read(); //goes to prop value
var closures = ClosureParser.GetClosures(reader);
var closures = ClosureParser.GetClosures(reader, CancellationToken);
if (closures.Any())
{
_total = 0;
Expand Down
Loading

0 comments on commit 3aa993c

Please sign in to comment.