Skip to content

Commit

Permalink
+ class SaverLocks for AuthorRevisionSaver, ReplySignatureSaver
Browse files Browse the repository at this point in the history
… & `UserSaver`

* move class `CrawlerLocks` from namespace `tbm.Crawler.Tieba.Crawl` into its child`.Crawler`
@ c#/crawler
  • Loading branch information
n0099 committed May 14, 2024
1 parent eb7aa9a commit f91793e
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 84 deletions.
1 change: 1 addition & 0 deletions c#/crawler/src/EntryPoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -59,5 +59,6 @@ protected override void ConfigureContainer(HostBuilderContext context, Container
builder.RegisterType<ThreadLateCrawlFacade>();
builder.RegisterType<SonicPusher>();
builder.RegisterType<CrawlPost>();
builder.RegisterGeneric(typeof(SaverLocks<>));
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
namespace tbm.Crawler.Tieba.Crawl;
namespace tbm.Crawler.Tieba.Crawl.Crawler;

public class CrawlerLocks(ILogger<CrawlerLocks> logger, IConfiguration config, CrawlerLocks.Type lockType)
: WithLogTrace(config, $"CrawlerLocks:{lockType}")
Expand Down
3 changes: 2 additions & 1 deletion c#/crawler/src/Tieba/Crawl/Facade/ThreadCrawlFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ protected override void OnBeforeCommitSave(CrawlerDbContext db, UserSaver userSa
.ToList();
if (newLatestRepliers.Count == 0) return;

var newlyLockedLatestRepliers = userSaver.AcquireUidLocksForSave(newLatestRepliers.Select(u => u.Uid));
var newlyLockedLatestRepliers = userSaver.AcquireUidLocksForSave
(newLatestRepliers.Select(u => u.Uid));
var newLatestRepliersExceptLocked = newLatestRepliers
.IntersectBy(newlyLockedLatestRepliers, u => u.Uid)
.Select(u =>
Expand Down
41 changes: 15 additions & 26 deletions c#/crawler/src/Tieba/Crawl/Saver/AuthorRevisionSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,19 @@

namespace tbm.Crawler.Tieba.Crawl.Saver;

public class AuthorRevisionSaver(PostType triggeredByPostType)
// locks only using AuthorRevision.Fid and Uid, ignoring TriggeredBy
// this prevents inserting multiple entities with similar time and other fields with the same values
public class AuthorRevisionSaver(
SaverLocks<(Fid Fid, Uid Uid)> authorExpGradeLocks,
PostType triggeredByPostType)
{
// locks only using AuthorRevision.Fid and Uid, ignoring TriggeredBy
// this prevents inserting multiple entities with similar time and other fields with the same values
private static readonly HashSet<(Fid Fid, Uid Uid)> GlobalLocks = [];
private readonly List<(Fid Fid, Uid Uid)> _localLocks = [];

public delegate AuthorRevisionSaver New(PostType triggeredByPostType);

public Action SaveAuthorExpGradeRevisions<TPostWithAuthorExpGrade>
(CrawlerDbContext db, IReadOnlyCollection<TPostWithAuthorExpGrade> posts)
where TPostWithAuthorExpGrade : PostWithAuthorExpGrade
{
SaveAuthorRevisions(db, posts, GlobalLocks,
SaveAuthorRevisions(db, posts, authorExpGradeLocks,
db.AuthorExpGradeRevisions,
p => p.AuthorExpGrade,
(a, b) => a != b,
Expand All @@ -34,13 +33,13 @@ public Action SaveAuthorExpGradeRevisions<TPostWithAuthorExpGrade>
TriggeredBy = triggeredByPostType,
AuthorExpGrade = t.Value
});
return () => ReleaseAllLocks(GlobalLocks);
return authorExpGradeLocks.ReleaseLocalLocked;
}

private void SaveAuthorRevisions<TPost, TRevision, TValue>(
private static void SaveAuthorRevisions<TPost, TRevision, TValue>(
CrawlerDbContext db,
IReadOnlyCollection<TPost> posts,
HashSet<(Fid Fid, Uid Uid)> globalLocks,
SaverLocks<(Fid Fid, Uid Uid)> locks,
IQueryable<TRevision> dbSet,
Func<TPost, TValue?> postAuthorFieldValueSelector,
Func<TValue?, TValue?, bool> isValueChangedPredicate,
Expand Down Expand Up @@ -74,24 +73,14 @@ private void SaveAuthorRevisions<TPost, TRevision, TValue>(
.Where(t => t.Existing.DiscoveredAt != t.NewInPost.DiscoveredAt
&& isValueChangedPredicate(t.Existing.Value, t.NewInPost.Value))
.Select(t => (t.Uid, t.NewInPost.Value, t.NewInPost.DiscoveredAt));
lock (globalLocks)
{
var newRevisionsExceptLocked = newRevisionOfNewUsers
locks.AcquireLocksThen(db.Set<TRevision>().AddRange,
alreadyLocked => newRevisionOfNewUsers
.Concat(newRevisionOfExistingUsers)
.Select(revisionFactory)
.ExceptBy(globalLocks, rev => (rev.Fid, rev.Uid))
.ToList();
if (newRevisionsExceptLocked.Count == 0) return;

_localLocks.AddRange(newRevisionsExceptLocked.Select(rev => (rev.Fid, rev.Uid)));
globalLocks.UnionWith(_localLocks);
db.Set<TRevision>().AddRange(newRevisionsExceptLocked);
}
}

private void ReleaseAllLocks(HashSet<(Fid Fid, Uid Uid)> globalLocks)
{
lock (globalLocks) globalLocks.ExceptWith(_localLocks);
.ExceptBy(alreadyLocked, rev => (rev.Fid, rev.Uid))
.ToList(),
newlyLocked => newlyLocked
.Select(rev => (rev.Fid, rev.Uid)));
}

private sealed class LatestAuthorRevisionProjection<TValue>
Expand Down
28 changes: 8 additions & 20 deletions c#/crawler/src/Tieba/Crawl/Saver/ReplySignatureSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@

namespace tbm.Crawler.Tieba.Crawl.Saver;

public class ReplySignatureSaver
public class ReplySignatureSaver(SaverLocks<ReplySignatureSaver.UniqueSignature> locks)
{
private static readonly HashSet<UniqueSignature> GlobalLocks = [];
private readonly List<UniqueSignature> _localLocks = [];

public Action SaveReplySignatures(CrawlerDbContext db, IEnumerable<ReplyPost> replies)
{
SharedHelper.GetNowTimestamp(out var now);
Expand Down Expand Up @@ -39,26 +36,17 @@ join newInReply in signatures on existing.SignatureId equals newInReply.Signatur
select (existing, newInReply))
.ForEach(t => t.existing.LastSeenAt = t.newInReply.LastSeenAt);

lock (GlobalLocks)
{
var newSignaturesExceptLocked = signatures
locks.AcquireLocksThen(db.ReplySignatures.AddRange,
alreadyLocked => signatures
.ExceptBy(existingSignatures.Select(s => s.SignatureId), s => s.SignatureId)
.ExceptBy(GlobalLocks, s => new(s.SignatureId, s.XxHash3))
.ToList();
if (newSignaturesExceptLocked.Count == 0) return () => { };

_localLocks.AddRange(newSignaturesExceptLocked
.ExceptBy(alreadyLocked, s => new(s.SignatureId, s.XxHash3))
.ToList(),
newlyLocked => newlyLocked
.Select(s => new UniqueSignature(s.SignatureId, s.XxHash3)));
GlobalLocks.UnionWith(_localLocks);
db.ReplySignatures.AddRange(newSignaturesExceptLocked);
}
return () =>
{
lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks);
};
return locks.ReleaseLocalLocked;
}

private sealed record UniqueSignature(uint Id, byte[] XxHash3)
public sealed record UniqueSignature(uint Id, byte[] XxHash3)
{
public bool Equals(UniqueSignature? other) =>
other != null && Id == other.Id && new ByteArrayEqualityComparer().Equals(XxHash3, other.XxHash3);
Expand Down
30 changes: 30 additions & 0 deletions c#/crawler/src/Tieba/Crawl/Saver/SaverLocks.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
namespace tbm.Crawler.Tieba.Crawl.Saver;

public class SaverLocks<TKey>
{
public delegate T NewLocksFactory<out T>(IReadOnlySet<TKey> alreadyLocked);
public delegate IEnumerable<TKey> LockingKeysSelector<in T>(T newlyLocked);

private static readonly HashSet<TKey> GlobalLocks = [];
private readonly List<TKey> _localLocks = [];

public void AcquireLocksThen<TNewLock>(
Action<IReadOnlyCollection<TNewLock>> payload,
NewLocksFactory<IReadOnlyCollection<TNewLock>> newLocksFactory,
LockingKeysSelector<IReadOnlyCollection<TNewLock>> lockingKeysSelector)
{
lock (GlobalLocks)
{
var newLocks = newLocksFactory(GlobalLocks);
if (newLocks.Count == 0) return;
_localLocks.AddRange(lockingKeysSelector(newLocks));
GlobalLocks.UnionWith(_localLocks);
payload(newLocks);
}
}

public void ReleaseLocalLocked()
{
lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks);
}
}
64 changes: 28 additions & 36 deletions c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@ protected override Dictionary<Type, AddRevisionDelegate>
_ => 0
};
}
public partial class UserSaver(ILogger<UserSaver> logger, ConcurrentDictionary<Uid, User> users)
public partial class UserSaver(
ILogger<UserSaver> logger, SaverLocks<Uid> locks,
ConcurrentDictionary<Uid, User> users)
: BaseSaver<BaseUserRevision>(logger)
{
private static readonly HashSet<Uid> GlobalLocks = [];
private readonly List<Uid> _localLocks = [];

public delegate UserSaver New(ConcurrentDictionary<Uid, User> users);

public void Save(
Expand All @@ -45,42 +44,35 @@ public void Save(
IFieldChangeIgnorance.FieldChangeIgnoranceDelegates userFieldChangeIgnorance)
{
if (users.IsEmpty) return;
lock (GlobalLocks)
{
var usersExceptLocked = new Dictionary<Uid, User>(users.ExceptBy(GlobalLocks, pair => pair.Key));
if (usersExceptLocked.Count == 0) return;
_localLocks.AddRange(usersExceptLocked.Keys);
GlobalLocks.UnionWith(_localLocks);

var existingUsersKeyByUid = (from user in db.Users.AsTracking()
where usersExceptLocked.Keys.Contains(user.Uid)
select user).ToDictionary(u => u.Uid);
SavePostsOrUsers(db, userFieldChangeIgnorance,
u => new UserRevision
{
TakenAt = u.UpdatedAt ?? u.CreatedAt,
Uid = u.Uid,
TriggeredBy = postType
},
usersExceptLocked.Values.ToLookup(u => existingUsersKeyByUid.ContainsKey(u.Uid)),
u => existingUsersKeyByUid[u.Uid]);
}
locks.AcquireLocksThen(newlyLocked =>
{
var existingUsersKeyByUid = (from user in db.Users.AsTracking()
where newlyLocked.Select(u => u.Uid).Contains(user.Uid)
select user).ToDictionary(u => u.Uid);
SavePostsOrUsers(db, userFieldChangeIgnorance,
u => new UserRevision
{
TakenAt = u.UpdatedAt ?? u.CreatedAt,
Uid = u.Uid,
TriggeredBy = postType
},
newlyLocked.ToLookup(u => existingUsersKeyByUid.ContainsKey(u.Uid)),
u => existingUsersKeyByUid[u.Uid]);
},
alreadyLocked => users
.ExceptBy(alreadyLocked, pair => pair.Key).Select(pair => pair.Value).ToList(),
newlyLocked => newlyLocked.Select(u => u.Uid));
}

public IEnumerable<Uid> AcquireUidLocksForSave(IEnumerable<Uid> usersId)
{
lock (GlobalLocks)
{
var exceptLocked = usersId.Except(GlobalLocks).ToList();
if (exceptLocked.Count == 0) return exceptLocked;
_localLocks.AddRange(exceptLocked);
GlobalLocks.UnionWith(exceptLocked);
return exceptLocked;
}
var exceptLocked = new List<Uid>();
locks.AcquireLocksThen(
newlyLocked => exceptLocked.AddRange(newlyLocked),
alreadyLocked => usersId.Except(alreadyLocked).ToList(),
i => i);
return exceptLocked;
}

public void OnPostSave()
{
lock (GlobalLocks) GlobalLocks.ExceptWith(_localLocks);
}
public void OnPostSave() => locks.ReleaseLocalLocked();
}

0 comments on commit f91793e

Please sign in to comment.