From f91793e67dded25059c5026d44794caecb8e0f33 Mon Sep 17 00:00:00 2001 From: n0099 Date: Tue, 14 May 2024 15:13:14 +0800 Subject: [PATCH] + class `SaverLocks` for `AuthorRevisionSaver`, `ReplySignatureSaver` & `UserSaver` * move class `CrawlerLocks` from namespace `tbm.Crawler.Tieba.Crawl` into its child`.Crawler` @ c#/crawler --- c#/crawler/src/EntryPoint.cs | 1 + .../Tieba/Crawl/{ => Crawler}/CrawlerLocks.cs | 2 +- .../Tieba/Crawl/Facade/ThreadCrawlFacade.cs | 3 +- .../Tieba/Crawl/Saver/AuthorRevisionSaver.cs | 41 +++++------- .../Tieba/Crawl/Saver/ReplySignatureSaver.cs | 28 +++----- .../src/Tieba/Crawl/Saver/SaverLocks.cs | 30 +++++++++ c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs | 64 ++++++++----------- 7 files changed, 85 insertions(+), 84 deletions(-) rename c#/crawler/src/Tieba/Crawl/{ => Crawler}/CrawlerLocks.cs (99%) create mode 100644 c#/crawler/src/Tieba/Crawl/Saver/SaverLocks.cs diff --git a/c#/crawler/src/EntryPoint.cs b/c#/crawler/src/EntryPoint.cs index 447c0a00..a90ecdb1 100644 --- a/c#/crawler/src/EntryPoint.cs +++ b/c#/crawler/src/EntryPoint.cs @@ -59,5 +59,6 @@ protected override void ConfigureContainer(HostBuilderContext context, Container builder.RegisterType(); builder.RegisterType(); builder.RegisterType(); + builder.RegisterGeneric(typeof(SaverLocks<>)); } } diff --git a/c#/crawler/src/Tieba/Crawl/CrawlerLocks.cs b/c#/crawler/src/Tieba/Crawl/Crawler/CrawlerLocks.cs similarity index 99% rename from c#/crawler/src/Tieba/Crawl/CrawlerLocks.cs rename to c#/crawler/src/Tieba/Crawl/Crawler/CrawlerLocks.cs index 6e817acc..08cfad66 100644 --- a/c#/crawler/src/Tieba/Crawl/CrawlerLocks.cs +++ b/c#/crawler/src/Tieba/Crawl/Crawler/CrawlerLocks.cs @@ -1,4 +1,4 @@ -namespace tbm.Crawler.Tieba.Crawl; +namespace tbm.Crawler.Tieba.Crawl.Crawler; public class CrawlerLocks(ILogger logger, IConfiguration config, CrawlerLocks.Type lockType) : WithLogTrace(config, $"CrawlerLocks:{lockType}") diff --git a/c#/crawler/src/Tieba/Crawl/Facade/ThreadCrawlFacade.cs b/c#/crawler/src/Tieba/Crawl/Facade/ThreadCrawlFacade.cs index 327f741b..747d7863 100644 --- a/c#/crawler/src/Tieba/Crawl/Facade/ThreadCrawlFacade.cs +++ b/c#/crawler/src/Tieba/Crawl/Facade/ThreadCrawlFacade.cs @@ -31,7 +31,8 @@ protected override void OnBeforeCommitSave(CrawlerDbContext db, UserSaver userSa .ToList(); if (newLatestRepliers.Count == 0) return; - var newlyLockedLatestRepliers = userSaver.AcquireUidLocksForSave(newLatestRepliers.Select(u => u.Uid)); + var newlyLockedLatestRepliers = userSaver.AcquireUidLocksForSave + (newLatestRepliers.Select(u => u.Uid)); var newLatestRepliersExceptLocked = newLatestRepliers .IntersectBy(newlyLockedLatestRepliers, u => u.Uid) .Select(u => diff --git a/c#/crawler/src/Tieba/Crawl/Saver/AuthorRevisionSaver.cs b/c#/crawler/src/Tieba/Crawl/Saver/AuthorRevisionSaver.cs index c5a6c268..86a58c62 100644 --- a/c#/crawler/src/Tieba/Crawl/Saver/AuthorRevisionSaver.cs +++ b/c#/crawler/src/Tieba/Crawl/Saver/AuthorRevisionSaver.cs @@ -2,20 +2,19 @@ namespace tbm.Crawler.Tieba.Crawl.Saver; -public class AuthorRevisionSaver(PostType triggeredByPostType) +// locks only using AuthorRevision.Fid and Uid, ignoring TriggeredBy +// this prevents inserting multiple entities with similar time and other fields with the same values +public class AuthorRevisionSaver( + SaverLocks<(Fid Fid, Uid Uid)> authorExpGradeLocks, + PostType triggeredByPostType) { - // locks only using AuthorRevision.Fid and Uid, ignoring TriggeredBy - // this prevents inserting multiple entities with similar time and other fields with the same values - private static readonly HashSet<(Fid Fid, Uid Uid)> GlobalLocks = []; - private readonly List<(Fid Fid, Uid Uid)> _localLocks = []; - public delegate AuthorRevisionSaver New(PostType triggeredByPostType); public Action SaveAuthorExpGradeRevisions (CrawlerDbContext db, IReadOnlyCollection posts) where TPostWithAuthorExpGrade : PostWithAuthorExpGrade { - SaveAuthorRevisions(db, posts, GlobalLocks, + SaveAuthorRevisions(db, posts, authorExpGradeLocks, db.AuthorExpGradeRevisions, p => p.AuthorExpGrade, (a, b) => a != b, @@ -34,13 +33,13 @@ public Action SaveAuthorExpGradeRevisions TriggeredBy = triggeredByPostType, AuthorExpGrade = t.Value }); - return () => ReleaseAllLocks(GlobalLocks); + return authorExpGradeLocks.ReleaseLocalLocked; } - private void SaveAuthorRevisions( + private static void SaveAuthorRevisions( CrawlerDbContext db, IReadOnlyCollection posts, - HashSet<(Fid Fid, Uid Uid)> globalLocks, + SaverLocks<(Fid Fid, Uid Uid)> locks, IQueryable dbSet, Func postAuthorFieldValueSelector, Func isValueChangedPredicate, @@ -74,24 +73,14 @@ private void SaveAuthorRevisions( .Where(t => t.Existing.DiscoveredAt != t.NewInPost.DiscoveredAt && isValueChangedPredicate(t.Existing.Value, t.NewInPost.Value)) .Select(t => (t.Uid, t.NewInPost.Value, t.NewInPost.DiscoveredAt)); - lock (globalLocks) - { - var newRevisionsExceptLocked = newRevisionOfNewUsers + locks.AcquireLocksThen(db.Set().AddRange, + alreadyLocked => newRevisionOfNewUsers .Concat(newRevisionOfExistingUsers) .Select(revisionFactory) - .ExceptBy(globalLocks, rev => (rev.Fid, rev.Uid)) - .ToList(); - if (newRevisionsExceptLocked.Count == 0) return; - - _localLocks.AddRange(newRevisionsExceptLocked.Select(rev => (rev.Fid, rev.Uid))); - globalLocks.UnionWith(_localLocks); - db.Set().AddRange(newRevisionsExceptLocked); - } - } - - private void ReleaseAllLocks(HashSet<(Fid Fid, Uid Uid)> globalLocks) - { - lock (globalLocks) globalLocks.ExceptWith(_localLocks); + .ExceptBy(alreadyLocked, rev => (rev.Fid, rev.Uid)) + .ToList(), + newlyLocked => newlyLocked + .Select(rev => (rev.Fid, rev.Uid))); } private sealed class LatestAuthorRevisionProjection diff --git a/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs b/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs index 84aa718c..f877d1ff 100644 --- a/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs +++ b/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs @@ -2,11 +2,8 @@ namespace tbm.Crawler.Tieba.Crawl.Saver; -public class ReplySignatureSaver +public class ReplySignatureSaver(SaverLocks locks) { - private static readonly HashSet GlobalLocks = []; - private readonly List _localLocks = []; - public Action SaveReplySignatures(CrawlerDbContext db, IEnumerable replies) { SharedHelper.GetNowTimestamp(out var now); @@ -39,26 +36,17 @@ join newInReply in signatures on existing.SignatureId equals newInReply.Signatur select (existing, newInReply)) .ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt); - lock (GlobalLocks) - { - var newSignaturesExceptLocked = signatures + locks.AcquireLocksThen(db.ReplySignatures.AddRange, + alreadyLocked => signatures .ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId) - .ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3)) - .ToList(); - if (newSignaturesExceptLocked.Count == 0) return () => { }; - - _localLocks.AddRange(newSignaturesExceptLocked + .ExceptBy(alreadyLocked, s => new(s.SignatureId, s.XxHash3)) + .ToList(), + newlyLocked => newlyLocked .Select(s => new UniqueSignature(s.SignatureId, s.XxHash3))); - GlobalLocks.UnionWith(_localLocks); - db.ReplySignatures.AddRange(newSignaturesExceptLocked); - } - return () => - { - lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks); - }; + return locks.ReleaseLocalLocked; } - private sealed record UniqueSignature(uint Id, byte[] XxHash3) + public sealed record UniqueSignature(uint Id, byte[] XxHash3) { public bool Equals(UniqueSignature? other) => other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3); diff --git a/c#/crawler/src/Tieba/Crawl/Saver/SaverLocks.cs b/c#/crawler/src/Tieba/Crawl/Saver/SaverLocks.cs new file mode 100644 index 00000000..f8670ee3 --- /dev/null +++ b/c#/crawler/src/Tieba/Crawl/Saver/SaverLocks.cs @@ -0,0 +1,30 @@ +namespace tbm.Crawler.Tieba.Crawl.Saver; + +public class SaverLocks +{ + public delegate T NewLocksFactory(IReadOnlySet alreadyLocked); + public delegate IEnumerable LockingKeysSelector(T newlyLocked); + + private static readonly HashSet GlobalLocks = []; + private readonly List _localLocks = []; + + public void AcquireLocksThen( + Action> payload, + NewLocksFactory> newLocksFactory, + LockingKeysSelector> lockingKeysSelector) + { + lock (GlobalLocks) + { + var newLocks = newLocksFactory(GlobalLocks); + if (newLocks.Count == 0) return; + _localLocks.AddRange(lockingKeysSelector(newLocks)); + GlobalLocks.UnionWith(_localLocks); + payload(newLocks); + } + } + + public void ReleaseLocalLocked() + { + lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks); + } +} diff --git a/c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs b/c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs index 24035815..b549736e 100644 --- a/c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs +++ b/c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs @@ -31,12 +31,11 @@ protected override Dictionary _ => 0 }; } -public partial class UserSaver(ILogger logger, ConcurrentDictionary users) +public partial class UserSaver( + ILogger logger, SaverLocks locks, + ConcurrentDictionary users) : BaseSaver(logger) { - private static readonly HashSet GlobalLocks = []; - private readonly List _localLocks = []; - public delegate UserSaver New(ConcurrentDictionary users); public void Save( @@ -45,42 +44,35 @@ public void Save( IFieldChangeIgnorance.FieldChangeIgnoranceDelegates userFieldChangeIgnorance) { if (users.IsEmpty) return; - lock (GlobalLocks) - { - var usersExceptLocked = new Dictionary(users.ExceptBy(GlobalLocks, pair => pair.Key)); - if (usersExceptLocked.Count == 0) return; - _localLocks.AddRange(usersExceptLocked.Keys); - GlobalLocks.UnionWith(_localLocks); - - var existingUsersKeyByUid = (from user in db.Users.AsTracking() - where usersExceptLocked.Keys.Contains(user.Uid) - select user).ToDictionary(u => u.Uid); - SavePostsOrUsers(db, userFieldChangeIgnorance, - u => new UserRevision - { - TakenAt = u.UpdatedAt ?? u.CreatedAt, - Uid = u.Uid, - TriggeredBy = postType - }, - usersExceptLocked.Values.ToLookup(u => existingUsersKeyByUid.ContainsKey(u.Uid)), - u => existingUsersKeyByUid[u.Uid]); - } + locks.AcquireLocksThen(newlyLocked => + { + var existingUsersKeyByUid = (from user in db.Users.AsTracking() + where newlyLocked.Select(u => u.Uid).Contains(user.Uid) + select user).ToDictionary(u => u.Uid); + SavePostsOrUsers(db, userFieldChangeIgnorance, + u => new UserRevision + { + TakenAt = u.UpdatedAt ?? u.CreatedAt, + Uid = u.Uid, + TriggeredBy = postType + }, + newlyLocked.ToLookup(u => existingUsersKeyByUid.ContainsKey(u.Uid)), + u => existingUsersKeyByUid[u.Uid]); + }, + alreadyLocked => users + .ExceptBy(alreadyLocked, pair => pair.Key).Select(pair => pair.Value).ToList(), + newlyLocked => newlyLocked.Select(u => u.Uid)); } public IEnumerable AcquireUidLocksForSave(IEnumerable usersId) { - lock (GlobalLocks) - { - var exceptLocked = usersId.Except(GlobalLocks).ToList(); - if (exceptLocked.Count == 0) return exceptLocked; - _localLocks.AddRange(exceptLocked); - GlobalLocks.UnionWith(exceptLocked); - return exceptLocked; - } + var exceptLocked = new List(); + locks.AcquireLocksThen( + newlyLocked => exceptLocked.AddRange(newlyLocked), + alreadyLocked => usersId.Except(alreadyLocked).ToList(), + i => i); + return exceptLocked; } - public void OnPostSave() - { - lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks); - } + public void OnPostSave() => locks.ReleaseLocalLocked(); }