diff --git a/c#/crawler/src/EntryPoint.cs b/c#/crawler/src/EntryPoint.cs index 799a93ab..447c0a00 100644 --- a/c#/crawler/src/EntryPoint.cs +++ b/c#/crawler/src/EntryPoint.cs @@ -52,6 +52,7 @@ protected override void ConfigureContainer(HostBuilderContext context, Container // eager initial all keyed CrawlerLocks singleton instances // in order to sync their timer of WithLogTrace .AutoActivate()); + builder.RegisterType(); builder.RegisterType(); builder.RegisterType(); builder.RegisterType(); diff --git a/c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs b/c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs index e418bfc0..12935dee 100644 --- a/c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs +++ b/c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs @@ -1,11 +1,11 @@ -using System.IO.Hashing; using PredicateBuilder = LinqKit.PredicateBuilder; namespace tbm.Crawler.Tieba.Crawl.Saver; -public partial class ReplySaver( +public class ReplySaver( ILogger logger, ConcurrentDictionary posts, + ReplySignatureSaver replySignatureSaver, AuthorRevisionSaver.New authorRevisionSaverFactory) : PostSaver( logger, posts, authorRevisionSaverFactory, PostType.Reply) @@ -55,7 +55,7 @@ public override SaverChangeSet Save(CrawlerDbContext db) .Select(r => new ReplyContent {Pid = r.Pid, ProtoBufBytes = r.Content})); SaveReplyContentImages(db, changeSet.NewlyAdded); PostSaveHandlers += AuthorRevisionSaver.SaveAuthorExpGradeRevisions(db, changeSet.AllAfter).Invoke; - PostSaveHandlers += SaveReplySignatures(db, changeSet.AllAfter).Invoke; + PostSaveHandlers += replySignatureSaver.SaveReplySignatures(db, changeSet.AllAfter).Invoke; return changeSet; } @@ -115,73 +115,3 @@ on existing.UrlFilename equals newInContent.UrlFilename })); } } -public partial class ReplySaver -{ - private static readonly HashSet GlobalLocks = []; - private readonly List _localLocks = []; - - private Action SaveReplySignatures(CrawlerDbContext db, IEnumerable replies) - { - SharedHelper.GetNowTimestamp(out var now); - var signatures = replies - .Where(r => r is {SignatureId: not null, Signature: not null}) - .DistinctBy(r => r.SignatureId) - .Select(r => new ReplySignature - { - UserId = r.AuthorUid, - SignatureId = (uint)r.SignatureId!, - XxHash3 = XxHash3.Hash(r.Signature!), - ProtoBufBytes = r.Signature!, - FirstSeenAt = now, - LastSeenAt = now - }).ToList(); - if (signatures.Count == 0) return () => { }; - - var uniqueSignatures = signatures - .ConvertAll(s => new UniqueSignature(s.SignatureId, s.XxHash3)); - var existingSignatures = ( - from s in db.ReplySignatures.AsTracking() - where uniqueSignatures.Select(us => us.Id).Contains(s.SignatureId) - - // server side eval doesn't need ByteArrayEqualityComparer - && uniqueSignatures.Select(us => us.XxHash3).Contains(s.XxHash3) - select s - ).ToList(); - (from existing in existingSignatures - join newInReply in signatures on existing.SignatureId equals newInReply.SignatureId - select (existing, newInReply)) - .ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt); - - lock (GlobalLocks) - { - var newSignaturesExceptLocked = signatures - .ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId) - .ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3)) - .ToList(); - if (newSignaturesExceptLocked.Count == 0) return () => { }; - - _localLocks.AddRange(newSignaturesExceptLocked - .Select(s => new UniqueSignature(s.SignatureId, s.XxHash3))); - GlobalLocks.UnionWith(_localLocks); - db.ReplySignatures.AddRange(newSignaturesExceptLocked); - } - return () => - { - lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks); - }; - } - - private sealed record UniqueSignature(uint Id, byte[] XxHash3) - { - public bool Equals(UniqueSignature? other) => - other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3); - - public override int GetHashCode() - { - var hash = default(HashCode); - hash.Add(Id); - hash.AddBytes(XxHash3); - return hash.ToHashCode(); - } - } -} diff --git a/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs b/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs new file mode 100644 index 00000000..84aa718c --- /dev/null +++ b/c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs @@ -0,0 +1,74 @@ +using System.IO.Hashing; + +namespace tbm.Crawler.Tieba.Crawl.Saver; + +public class ReplySignatureSaver +{ + private static readonly HashSet GlobalLocks = []; + private readonly List _localLocks = []; + + public Action SaveReplySignatures(CrawlerDbContext db, IEnumerable replies) + { + SharedHelper.GetNowTimestamp(out var now); + var signatures = replies + .Where(r => r is {SignatureId: not null, Signature: not null}) + .DistinctBy(r => r.SignatureId) + .Select(r => new ReplySignature + { + UserId = r.AuthorUid, + SignatureId = (uint)r.SignatureId!, + XxHash3 = XxHash3.Hash(r.Signature!), + ProtoBufBytes = r.Signature!, + FirstSeenAt = now, + LastSeenAt = now + }).ToList(); + if (signatures.Count == 0) return () => { }; + + var uniqueSignatures = signatures + .ConvertAll(s => new UniqueSignature(s.SignatureId, s.XxHash3)); + var existingSignatures = ( + from s in db.ReplySignatures.AsTracking() + where uniqueSignatures.Select(us => us.Id).Contains(s.SignatureId) + + // server side eval doesn't need ByteArrayEqualityComparer + && uniqueSignatures.Select(us => us.XxHash3).Contains(s.XxHash3) + select s + ).ToList(); + (from existing in existingSignatures + join newInReply in signatures on existing.SignatureId equals newInReply.SignatureId + select (existing, newInReply)) + .ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt); + + lock (GlobalLocks) + { + var newSignaturesExceptLocked = signatures + .ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId) + .ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3)) + .ToList(); + if (newSignaturesExceptLocked.Count == 0) return () => { }; + + _localLocks.AddRange(newSignaturesExceptLocked + .Select(s => new UniqueSignature(s.SignatureId, s.XxHash3))); + GlobalLocks.UnionWith(_localLocks); + db.ReplySignatures.AddRange(newSignaturesExceptLocked); + } + return () => + { + lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks); + }; + } + + private sealed record UniqueSignature(uint Id, byte[] XxHash3) + { + public bool Equals(UniqueSignature? other) => + other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3); + + public override int GetHashCode() + { + var hash = default(HashCode); + hash.Add(Id); + hash.AddBytes(XxHash3); + return hash.ToHashCode(); + } + } +}