Skip to content

Commit

Permalink
+ class ReplySignatureSaver which split from partial class `ReplySa…
Browse files Browse the repository at this point in the history
…ver` @ c#/crawler
  • Loading branch information
n0099 committed May 13, 2024
1 parent c1d6890 commit eb7aa9a
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 73 deletions.
1 change: 1 addition & 0 deletions c#/crawler/src/EntryPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ protected override void ConfigureContainer(HostBuilderContext context, Container
// eager initial all keyed CrawlerLocks singleton instances
// in order to sync their timer of WithLogTrace
.AutoActivate());
builder.RegisterType<ReplySignatureSaver>();
builder.RegisterType<AuthorRevisionSaver>();
builder.RegisterType<UserParser>();
builder.RegisterType<ThreadLateCrawler>();
Expand Down
76 changes: 3 additions & 73 deletions c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
using System.IO.Hashing;
using PredicateBuilder = LinqKit.PredicateBuilder;

namespace tbm.Crawler.Tieba.Crawl.Saver;

public partial class ReplySaver(
public class ReplySaver(
ILogger<ReplySaver> logger,
ConcurrentDictionary<PostId, ReplyPost> posts,
ReplySignatureSaver replySignatureSaver,
AuthorRevisionSaver.New authorRevisionSaverFactory)
: PostSaver<ReplyPost, BaseReplyRevision>(
logger, posts, authorRevisionSaverFactory, PostType.Reply)
Expand Down Expand Up @@ -55,7 +55,7 @@ public override SaverChangeSet<ReplyPost> Save(CrawlerDbContext db)
.Select(r => new ReplyContent {Pid = r.Pid, ProtoBufBytes = r.Content}));
SaveReplyContentImages(db, changeSet.NewlyAdded);
PostSaveHandlers += AuthorRevisionSaver.SaveAuthorExpGradeRevisions(db, changeSet.AllAfter).Invoke;
PostSaveHandlers += SaveReplySignatures(db, changeSet.AllAfter).Invoke;
PostSaveHandlers += replySignatureSaver.SaveReplySignatures(db, changeSet.AllAfter).Invoke;

return changeSet;
}
Expand Down Expand Up @@ -115,73 +115,3 @@ on existing.UrlFilename equals newInContent.UrlFilename
}));
}
}
public partial class ReplySaver
{
private static readonly HashSet<UniqueSignature> GlobalLocks = [];
private readonly List<UniqueSignature> _localLocks = [];

private Action SaveReplySignatures(CrawlerDbContext db, IEnumerable<ReplyPost> replies)
{
SharedHelper.GetNowTimestamp(out var now);
var signatures = replies
.Where(r => r is {SignatureId: not null, Signature: not null})
.DistinctBy(r => r.SignatureId)
.Select(r => new ReplySignature
{
UserId = r.AuthorUid,
SignatureId = (uint)r.SignatureId!,
XxHash3 = XxHash3.Hash(r.Signature!),
ProtoBufBytes = r.Signature!,
FirstSeenAt = now,
LastSeenAt = now
}).ToList();
if (signatures.Count == 0) return () => { };

var uniqueSignatures = signatures
.ConvertAll(s => new UniqueSignature(s.SignatureId, s.XxHash3));
var existingSignatures = (
from s in db.ReplySignatures.AsTracking()
where uniqueSignatures.Select(us => us.Id).Contains(s.SignatureId)

// server side eval doesn't need ByteArrayEqualityComparer
&& uniqueSignatures.Select(us => us.XxHash3).Contains(s.XxHash3)
select s
).ToList();
(from existing in existingSignatures
join newInReply in signatures on existing.SignatureId equals newInReply.SignatureId
select (existing, newInReply))
.ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt);

lock (GlobalLocks)
{
var newSignaturesExceptLocked = signatures
.ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId)
.ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3))
.ToList();
if (newSignaturesExceptLocked.Count == 0) return () => { };

_localLocks.AddRange(newSignaturesExceptLocked
.Select(s => new UniqueSignature(s.SignatureId, s.XxHash3)));
GlobalLocks.UnionWith(_localLocks);
db.ReplySignatures.AddRange(newSignaturesExceptLocked);
}
return () =>
{
lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks);
};
}

private sealed record UniqueSignature(uint Id, byte[] XxHash3)
{
public bool Equals(UniqueSignature? other) =>
other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3);

public override int GetHashCode()
{
var hash = default(HashCode);
hash.Add(Id);
hash.AddBytes(XxHash3);
return hash.ToHashCode();
}
}
}
74 changes: 74 additions & 0 deletions c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
using System.IO.Hashing;

namespace tbm.Crawler.Tieba.Crawl.Saver;

public class ReplySignatureSaver
{
private static readonly HashSet<UniqueSignature> GlobalLocks = [];
private readonly List<UniqueSignature> _localLocks = [];

public Action SaveReplySignatures(CrawlerDbContext db, IEnumerable<ReplyPost> replies)
{
SharedHelper.GetNowTimestamp(out var now);
var signatures = replies
.Where(r => r is {SignatureId: not null, Signature: not null})
.DistinctBy(r => r.SignatureId)
.Select(r => new ReplySignature
{
UserId = r.AuthorUid,
SignatureId = (uint)r.SignatureId!,
XxHash3 = XxHash3.Hash(r.Signature!),
ProtoBufBytes = r.Signature!,
FirstSeenAt = now,
LastSeenAt = now
}).ToList();
if (signatures.Count == 0) return () => { };

var uniqueSignatures = signatures
.ConvertAll(s => new UniqueSignature(s.SignatureId, s.XxHash3));
var existingSignatures = (
from s in db.ReplySignatures.AsTracking()
where uniqueSignatures.Select(us => us.Id).Contains(s.SignatureId)

// server side eval doesn't need ByteArrayEqualityComparer
&& uniqueSignatures.Select(us => us.XxHash3).Contains(s.XxHash3)
select s
).ToList();
(from existing in existingSignatures
join newInReply in signatures on existing.SignatureId equals newInReply.SignatureId
select (existing, newInReply))
.ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt);

lock (GlobalLocks)
{
var newSignaturesExceptLocked = signatures
.ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId)
.ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3))
.ToList();
if (newSignaturesExceptLocked.Count == 0) return () => { };

_localLocks.AddRange(newSignaturesExceptLocked
.Select(s => new UniqueSignature(s.SignatureId, s.XxHash3)));
GlobalLocks.UnionWith(_localLocks);
db.ReplySignatures.AddRange(newSignaturesExceptLocked);
}
return () =>
{
lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks);
};
}

private sealed record UniqueSignature(uint Id, byte[] XxHash3)
{
public bool Equals(UniqueSignature? other) =>
other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3);

public override int GetHashCode()
{
var hash = default(HashCode);
hash.Add(Id);
hash.AddBytes(XxHash3);
return hash.ToHashCode();
}
}
}

0 comments on commit eb7aa9a

Please sign in to comment.