Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP Snapshot Load Batch Queue #434

Draft
wants to merge 10 commits into
base: dev
Choose a base branch
from
307 changes: 275 additions & 32 deletions src/Akka.Persistence.Sql/Snapshot/ByteArraySnapshotDao.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,77 @@
// -----------------------------------------------------------------------

using System;
using System.Collections.Generic;
using System.Data;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Akka.Event;
using Akka.Persistence.Sql.Config;
using Akka.Persistence.Sql.Db;
using Akka.Persistence.Sql.Extensions;
using Akka.Streams;
using Akka.Streams.Dsl;
using Akka.Util;
using LinqToDB;
using LinqToDB.Tools;

namespace Akka.Persistence.Sql.Snapshot
{
public static class SubFlowExtensions
{
public static Source<TOut, TMat> MergeSubStreamsAsSource<TOut, TMat,
TClosed>(this SubFlow<TOut, TMat, TClosed> subFlow)
{
return (Source<TOut,TMat>)(subFlow.MergeSubstreams());
}
}
Comment on lines +26 to +33
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy for feedback on this method but TBH it would be nice if some form (even if it's not returning Source<TOut,TMat>) was put into akka streams proper. It's really useful


public class LatestSnapRequestEntry
{
public LatestSnapRequestEntry(string persistenceId)
{
PersistenceId = persistenceId;
TCS = new TaskCompletionSource<Option<SelectedSnapshot>>(TaskCreationOptions.RunContinuationsAsynchronously);
}

public readonly string PersistenceId;
public readonly TaskCompletionSource<Option<SelectedSnapshot>> TCS;
}

public readonly record struct SnapshotReadGroup
{
public SnapshotReadGroup(QueryLatestSnapSet a, List<LongSnapshotRow> b, Exception? err)
{
this.a = a;
this.b = b;
this.err = err;
}
public QueryLatestSnapSet a { get; }
public List<LongSnapshotRow> b { get; }
public Exception? err { get; }
public void Deconstruct(out QueryLatestSnapSet a, out List<LongSnapshotRow> b, out Exception? err)
{
a = this.a;
b = this.b;
err = this.err;
}
}
Comment on lines +49 to +66
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

needs cleanup also not sure if struct is actually good idea or not here :)


public class QueryLatestSnapSet
{
public readonly Dictionary<string, List<TaskCompletionSource<Option<SelectedSnapshot>>>> Entries = new();

public void Add(LatestSnapRequestEntry entry)
{
if (Entries.TryGetValue(entry.PersistenceId, out var item) == false)
{
item = Entries[entry.PersistenceId] = new List<TaskCompletionSource<Option<SelectedSnapshot>>>();
}
item.Add(entry.TCS);
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May or may not be better to return this on .Add() for cosmetics/etc.

public class ByteArraySnapshotDao : ISnapshotDao
{
private readonly AkkaPersistenceDataConnectionFactory _connectionFactory;
Expand All @@ -29,6 +86,8 @@ public class ByteArraySnapshotDao : ISnapshotDao
private readonly CancellationTokenSource _shutdownCts;
private readonly SnapshotConfig _snapshotConfig;
private readonly IsolationLevel _writeIsolationLevel;
private readonly Channel<LatestSnapRequestEntry> _pendingLatestChannel;
private readonly Task<Done> _latestSnapStream;

public ByteArraySnapshotDao(
AkkaPersistenceDataConnectionFactory connectionFactory,
Expand All @@ -48,6 +107,181 @@ public ByteArraySnapshotDao(
_readIsolationLevel = snapshotConfig.ReadIsolationLevel;

_shutdownCts = new CancellationTokenSource();
_pendingLatestChannel = Channel.CreateUnbounded<LatestSnapRequestEntry>();
int maxSubStreamsForReads = 8; // TODO: Configurable
int maxRequestsPerBatch = 50;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should be configurable

_latestSnapStream = Source.ChannelReader(_pendingLatestChannel.Reader)
.GroupBy(maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% maxSubStreamsForReads)
.BatchWeighted(
maxRequestsPerBatch,
a => 1,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know in ShotGlass I had/have a Batcher that could assign a 'zero weight' for cases where a persistence ID was already queued...

FBOW in this first go here I instead decided a flat weight of 1 would be a lot less 'change' but also has -some- benefits for forward progress. (but really mostly because a custom stream stage felt like overkill for first intro of this.)

e =>
{
var a = new QueryLatestSnapSet();
a.Add(e);
return a;
},
(a, e) =>
{
a.Add(e);
return a;
})
.SelectAsync(1,
async a =>
{

using (var connection = _connectionFactory.GetConnection())
{
if (connection.UseDateTime)
{
//TODO: Consolidate/fixup different rowtype issues.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am admittedly unsure how we got to having separate categories here; we -should- be able to set all of this up such that either L2DB is doing the 'right thing' for a given db config, and/or we sanely do our queries against whatever mapping is there. (well, actually it's a bit of both... still...)

try
{
var set = await connection.GetTable<DateTimeSnapshotRow>()
.Where(r => r.PersistenceId.In(a.Entries.Keys))
.Select(
r => new
{
Created = r.Created,
PersistenceId = r.PersistenceId,
SequenceNumber = r.SequenceNumber,
Manifest = r.Manifest,
Payload = r.Payload,
SerializerId = r.SerializerId,
RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue()
})
.Where(r => r.RowNum == 1)
.Select(
r => new LongSnapshotRow()
{
Created = r.Created.Ticks,
PersistenceId = r.PersistenceId,
SequenceNumber = r.SequenceNumber,
Manifest = r.Manifest,
Payload = r.Payload,
SerializerId = r.SerializerId,
}).ToListAsync();
return new SnapshotReadGroup(a, set, err: (Exception?)null);
}
catch (Exception ex)
{
return new (a, null, err: ex);
}
}
else
{
try
{
var set = await connection.GetTable<LongSnapshotRow>()
.Where(r => r.PersistenceId.In(a.Entries.Keys))
.Select(
r => new
{
Created = r.Created,
PersistenceId = r.PersistenceId,
SequenceNumber = r.SequenceNumber,
Manifest = r.Manifest,
Payload = r.Payload,
SerializerId = r.SerializerId,
RowNum = LinqToDB.Sql.Ext.Rank().Over().PartitionBy(r.PersistenceId).OrderByDesc(r.SequenceNumber).ToValue()
})
.Where(r => r.RowNum == 1)
.Select(
r => new LongSnapshotRow()
{
Created = r.Created,
PersistenceId = r.PersistenceId,
SequenceNumber = r.SequenceNumber,
Manifest = r.Manifest,
Payload = r.Payload,
SerializerId = r.SerializerId,
}).ToListAsync();
return new (a, set, err: (Exception?)null);
}
catch (Exception ex)
{
return new (a, null, err: ex);
}
}
}
}).SelectAsync(1,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IDK if this should -really- be SelectAsync, alas I don't have a clean way to do .Async() on a Subflow ;_; ???

In general the desire here is to ensure that we can fire off results to things without blocking the next read. Probably a cleaner/better way to do this, open to suggestions. <3

async (ab) =>
{
var (a, b, c) = ab;
if (c != null)
{
foreach (var taskCompletionSourcese in a!.Entries.Values.ToList())
{
foreach (var taskCompletionSource in taskCompletionSourcese)
{
taskCompletionSource.TrySetException(c);
}
}
}
else
{
//TODO: Pool this set:
var tempSet = new List<string>();
if (b.Count == 0)
{
foreach (var keyValuePair in a.Entries)
{
foreach (var taskCompletionSource in keyValuePair.Value)
{
taskCompletionSource.TrySetResult(Option<SelectedSnapshot>.None);
}
}
}
foreach (var result in b)
{
if (a.Entries.TryGetValue(result.PersistenceId, out var toSet))
{
try
{
var res = _longSerializer.Deserialize(result);
if (res.IsSuccess)
{
foreach (var taskCompletionSource in toSet)
{
taskCompletionSource.TrySetResult(res.Success);
}
}
else
{
foreach (var taskCompletionSource in toSet)
{
taskCompletionSource.TrySetException(res.Failure.Value);
}
}
}
catch (Exception e)
{
foreach (var taskCompletionSource in toSet)
{
taskCompletionSource.TrySetException(e);
}
}
}
else
{
tempSet.Add(result.PersistenceId);
}

foreach (var se in tempSet)
{
if (a.Entries.TryGetValue(se, out var setNo))
{
foreach (var taskCompletionSource in setNo)
{
taskCompletionSource.TrySetResult(Option<SelectedSnapshot>.None);
}
}
}
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hate this and would love cleaner suggestions. <3

}

return Done.Instance;
}).MergeSubStreamsAsSource().RunWith(Sink.Ignore<Done>(), materializer);
}

public async Task DeleteAllSnapshotsAsync(
Expand Down Expand Up @@ -182,41 +416,50 @@ await connection
});
}

public async Task<Option<SelectedSnapshot>> LatestSnapshotAsync(
public Task<Option<SelectedSnapshot>> LatestSnapshotAsync(
string persistenceId,
CancellationToken cancellationToken = default)
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should probably pass the CTS or whatever into the request, for the sake of allowing the token to do the needful.

return await _connectionFactory.ExecuteWithTransactionAsync(
_readIsolationLevel,
cts.Token,
async (connection, token) =>
{
if (connection.UseDateTime)
{
var row = await connection
.GetTable<DateTimeSnapshotRow>()
.Where(r => r.PersistenceId == persistenceId)
.OrderByDescending(t => t.SequenceNumber)
.FirstOrDefaultAsync(token);

return row != null
? _dateTimeSerializer.Deserialize(row).Get()
: Option<SelectedSnapshot>.None;
}
else
{
var row = await connection
.GetTable<LongSnapshotRow>()
.Where(r => r.PersistenceId == persistenceId)
.OrderByDescending(t => t.SequenceNumber)
.FirstOrDefaultAsync(token);

return row != null
? _longSerializer.Deserialize(row).Get()
: Option<SelectedSnapshot>.None;
}
});
var req = new LatestSnapRequestEntry(persistenceId);
if (_pendingLatestChannel.Writer.TryWrite(req))
{
return req.TCS.Task;
}
else
{
return Task.FromException<Option<SelectedSnapshot>>(new Exception("Queue is closed, System may be shutting down!"));
}
//var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token);
//return await _connectionFactory.ExecuteWithTransactionAsync(
// _readIsolationLevel,
// cts.Token,
// async (connection, token) =>
// {
// if (connection.UseDateTime)
// {
// var row = await connection
// .GetTable<DateTimeSnapshotRow>()
// .Where(r => r.PersistenceId == persistenceId)
// .OrderByDescending(t => t.SequenceNumber)
// .FirstOrDefaultAsync(token);
//
// return row != null
// ? _dateTimeSerializer.Deserialize(row).Get()
// : Option<SelectedSnapshot>.None;
// }
// else
// {
// var row = await connection
// .GetTable<LongSnapshotRow>()
// .Where(r => r.PersistenceId == persistenceId)
// .OrderByDescending(t => t.SequenceNumber)
// .FirstOrDefaultAsync(token);
//
// return row != null
// ? _longSerializer.Deserialize(row).Get()
// : Option<SelectedSnapshot>.None;
// }
// });
}

public async Task<Option<SelectedSnapshot>> SnapshotForMaxTimestampAsync(
Expand Down
Loading