-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Snapshot Load Batch Queue #434
base: dev
Are you sure you want to change the base?
Conversation
This doesn't pipeline writes, but it does pipeline reads which...
tl;dr- this is a change on reads will help overall perf in a real-world system most likely.
tl;dr- we can do this pattern or a variant in other places for reads, and it will help with performance under load scenarios.
tl;dr- this PR is a basic implementation and may require some optimization to do the 'best thing' for each database. Or in general |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added my self-comments, just in case I get bussed/whatever.
public static class SubFlowExtensions | ||
{ | ||
public static Source<TOut, TMat> MergeSubStreamsAsSource<TOut, TMat, | ||
TClosed>(this SubFlow<TOut, TMat, TClosed> subFlow) | ||
{ | ||
return (Source<TOut,TMat>)(subFlow.MergeSubstreams()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy for feedback on this method but TBH it would be nice if some form (even if it's not returning Source<TOut,TMat>
) was put into akka streams proper. It's really useful
public readonly record struct SnapshotReadGroup | ||
{ | ||
public SnapshotReadGroup(QueryLatestSnapSet a, List<LongSnapshotRow> b, Exception? err) | ||
{ | ||
this.a = a; | ||
this.b = b; | ||
this.err = err; | ||
} | ||
public QueryLatestSnapSet a { get; } | ||
public List<LongSnapshotRow> b { get; } | ||
public Exception? err { get; } | ||
public void Deconstruct(out QueryLatestSnapSet a, out List<LongSnapshotRow> b, out Exception? err) | ||
{ | ||
a = this.a; | ||
b = this.b; | ||
err = this.err; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
needs cleanup also not sure if struct is actually good idea or not here :)
public class QueryLatestSnapSet | ||
{ | ||
public readonly Dictionary<string, List<TaskCompletionSource<Option<SelectedSnapshot>>>> Entries = new(); | ||
|
||
public void Add(LatestSnapRequestEntry entry) | ||
{ | ||
if (Entries.TryGetValue(entry.PersistenceId, out var item) == false) | ||
{ | ||
item = Entries[entry.PersistenceId] = new List<TaskCompletionSource<Option<SelectedSnapshot>>>(); | ||
} | ||
item.Add(entry.TCS); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May or may not be better to return this
on .Add()
for cosmetics/etc.
int maxSubStreamsForReads = 8; // TODO: Configurable | ||
int maxRequestsPerBatch = 50; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be configurable
.GroupBy(maxSubStreamsForReads, a=> a.PersistenceId.GetHashCode()% maxSubStreamsForReads) | ||
.BatchWeighted( | ||
maxRequestsPerBatch, | ||
a => 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know in ShotGlass I had/have a Batcher that could assign a 'zero weight' for cases where a persistence ID was already queued...
FBOW in this first go here I instead decided a flat weight of 1 would be a lot less 'change' but also has -some- benefits for forward progress. (but really mostly because a custom stream stage felt like overkill for first intro of this.)
{ | ||
if (connection.UseDateTime) | ||
{ | ||
//TODO: Consolidate/fixup different rowtype issues. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am admittedly unsure how we got to having separate categories here; we -should- be able to set all of this up such that either L2DB is doing the 'right thing' for a given db config, and/or we sanely do our queries against whatever mapping is there. (well, actually it's a bit of both... still...)
} | ||
} | ||
} | ||
}).SelectAsync(1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IDK if this should -really- be SelectAsync
, alas I don't have a clean way to do .Async()
on a Subflow ;_; ???
In general the desire here is to ensure that we can fire off results to things without blocking the next read. Probably a cleaner/better way to do this, open to suggestions. <3
foreach (var result in b) | ||
{ | ||
if (a.Entries.TryGetValue(result.PersistenceId, out var toSet)) | ||
{ | ||
try | ||
{ | ||
var res = _longSerializer.Deserialize(result); | ||
if (res.IsSuccess) | ||
{ | ||
foreach (var taskCompletionSource in toSet) | ||
{ | ||
taskCompletionSource.TrySetResult(res.Success); | ||
} | ||
} | ||
else | ||
{ | ||
foreach (var taskCompletionSource in toSet) | ||
{ | ||
taskCompletionSource.TrySetException(res.Failure.Value); | ||
} | ||
} | ||
} | ||
catch (Exception e) | ||
{ | ||
foreach (var taskCompletionSource in toSet) | ||
{ | ||
taskCompletionSource.TrySetException(e); | ||
} | ||
} | ||
} | ||
else | ||
{ | ||
tempSet.Add(result.PersistenceId); | ||
} | ||
|
||
foreach (var se in tempSet) | ||
{ | ||
if (a.Entries.TryGetValue(se, out var setNo)) | ||
{ | ||
foreach (var taskCompletionSource in setNo) | ||
{ | ||
taskCompletionSource.TrySetResult(Option<SelectedSnapshot>.None); | ||
} | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I hate this and would love cleaner suggestions. <3
string persistenceId, | ||
CancellationToken cancellationToken = default) | ||
{ | ||
var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _shutdownCts.Token); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably pass the CTS or whatever into the request, for the sake of allowing the token to do the needful.
Will not fix #432 but will probably help on some level
Changes
This PR changes Getting default snapshot store load operation of getting the 'newest' snapshot (i.e. not caring about SEQ or timestamp) from being a multitude of snapshot requests into a queue, similar to how we handle Write requests on Journals.
Checklist
For significant changes, please ensure that the following have been completed (delete if not relevant):
Latest
dev
BenchmarksWe don't really have benchmarks against snapshot. if someone wants to add I'm happy to rebase/etc and test
This PR's Benchmarks
See above.
Stuff I still need to validate:
The main difference of course is that this is a -read- and more complex than a sequence number [0], and as such I did take the -small- step of splitting out deserialization from the main read logic as a separate stage.
This may have a minor performance impact on cases where only a small number of actors are recovering, however it should greatly improve recovery performance under load. Overall intent is to have the numbers be configurable for a given case (maybe we have a fallback switch for those who want old behavior?)
[0] - Hint hint ;)