Skip to content

Commit

Permalink
* now will try locking both (newly|already)LockedImages with ten se…
Browse files Browse the repository at this point in the history
…conds timeout and release lcoks in the returned hooks that will be invoked after `DbContext.SaveChange()`

* now will skip fetching new `ImageInReply` entities after locking `alreadyLockedImages` when it's empty
* rename variable `imagesKeyByUrlFilename` to `images`
* rename variable `(newly|already)Locked` to `(newly|already)LockedImages`
@ `Save()`

* rename field `LocksKeyByUrlFilename` to `GlobalLockedImagesInReplyKeyByUrlFilename`
- primary ctor param `locks`
@ ReplyContentImageSaver.cs

* now will quick exit when `newlyLocked` is empty @ `UserSaver.Save()`
@ c#/crawler
  • Loading branch information
n0099 committed Jun 3, 2024
1 parent 51caaf4 commit 25ab42d
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 33 deletions.
2 changes: 1 addition & 1 deletion c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public override SaverChangeSet<ReplyPost> Save(CrawlerDbContext db)
r => new ReplyRevision {TakenAt = r.UpdatedAt ?? r.CreatedAt, Pid = r.Pid},
LinqKit.PredicateBuilder.New<ReplyPost>(r => Posts.Keys.Contains(r.Pid)));

replyContentImageSaver.Save(db, changeSet.NewlyAdded);
PostSaveHandlers += replyContentImageSaver.Save(db, changeSet.NewlyAdded).Invoke;
PostSaveHandlers += AuthorRevisionSaver.SaveAuthorExpGradeRevisions(db, changeSet.AllAfter).Invoke;
PostSaveHandlers += replySignatureSaver.Save(db, changeSet.AllAfter).Invoke;

Expand Down
80 changes: 48 additions & 32 deletions c#/crawler/src/Tieba/Crawl/Saver/ReplyContentImageSaver.cs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
namespace tbm.Crawler.Tieba.Crawl.Saver;

public class ReplyContentImageSaver(SaverLocks<string> locks)
public class ReplyContentImageSaver(ILogger<ReplyContentImageSaver> logger)
{
private static readonly ConcurrentDictionary<string, object> LocksKeyByUrlFilename = new();
private static readonly ConcurrentDictionary<string, ImageInReply>
GlobalLockedImagesInReplyKeyByUrlFilename = new();

public void Save(CrawlerDbContext db, IEnumerable<ReplyPost> replies)
public Action Save(CrawlerDbContext db, IEnumerable<ReplyPost> replies)
{
var pidAndImageList = (
from r in replies
Expand All @@ -19,39 +20,45 @@ from c in r.OriginalContents
}))
.DistinctBy(t => (t.Pid, t.Image.UrlFilename))
.ToList();
if (pidAndImageList.Count == 0) return;
var imagesKeyByUrlFilename = pidAndImageList.Select(t => t.Image)
if (pidAndImageList.Count == 0) return () => { };
var images = pidAndImageList.Select(t => t.Image)
.DistinctBy(image => image.UrlFilename).ToDictionary(image => image.UrlFilename);

var existingImages = (
from e in db.ImageInReplies.AsTracking()
where imagesKeyByUrlFilename.Keys.Contains(e.UrlFilename)
where images.Keys.Contains(e.UrlFilename)
select e)
.ToDictionary(e => e.UrlFilename);
var newImages = imagesKeyByUrlFilename.ExceptByKey(existingImages.Keys).Keys().ToList();
var newlyLocked = locks.AcquireLocks(newImages);
var alreadyLocked = newImages.Except(newlyLocked).ToList();
var newImages = images
.ExceptByKey(existingImages.Keys).ToDictionary();

if (newlyLocked.Any(urlFilename => !LocksKeyByUrlFilename.TryAdd(urlFilename, new())))
throw new InvalidOperationException();
alreadyLocked.ForEach(urlFilename =>
{
lock (LocksKeyByUrlFilename[urlFilename])
#pragma warning disable S108 // Either remove or fill this block of code.
{
}
#pragma warning restore S108 // Either remove or fill this block of code.
});
existingImages = existingImages
.Concat((
from e in db.ImageInReplies.AsTracking()
where alreadyLocked.Contains(e.UrlFilename)
select e).ToDictionary(e => e.UrlFilename))
var newlyLockedImages = newImages
.Where(pair => GlobalLockedImagesInReplyKeyByUrlFilename.TryAdd(pair.Key, pair.Value))
.ToDictionary();
newlyLockedImages.Values()
.Where(reply => !Monitor.TryEnter(reply, TimeSpan.FromSeconds(10)))
.ForEach(image => logger.LogWarning(
"Wait for locking newly locked image {} timed out after 10s", image.UrlFilename));

var alreadyLockedImages = GlobalLockedImagesInReplyKeyByUrlFilename
.IntersectByKey(newImages
.Keys().Except(newlyLockedImages.Keys()))
.ToDictionary();
alreadyLockedImages.Values()
.Where(reply => !Monitor.TryEnter(reply, TimeSpan.FromSeconds(10)))
.ForEach(image => logger.LogWarning(
"Wait for locking already locked image {} timed out after 10s", image.UrlFilename));
if (alreadyLockedImages.Count != 0)
existingImages = existingImages
.Concat((
from e in db.ImageInReplies.AsTracking()
where alreadyLockedImages.Keys().Contains(e.UrlFilename)
select e).ToDictionary(e => e.UrlFilename))
.ToDictionary();

(from existing in existingImages.Values
where existing.ExpectedByteSize == 0 // randomly respond with 0
join newInContent in imagesKeyByUrlFilename.Values
join newInContent in images.Values
on existing.UrlFilename equals newInContent.UrlFilename
select (existing, newInContent))
.ForEach(t => t.existing.ExpectedByteSize = t.newInContent.ExpectedByteSize);
Expand All @@ -62,19 +69,28 @@ on existing.UrlFilename equals newInContent.UrlFilename

// no need to manually invoke DbContext.AddRange(images) since EF Core will do these chore
// https://stackoverflow.com/questions/5212751/how-can-i-retrieve-id-of-inserted-entity-using-entity-framework/41146434#41146434
// reuse the same instance from imagesKeyByUrlFilename
// reuse the same instance from existingImages
// will prevent assigning multiple different instances with the same key
// which will cause EF Core to insert identify entry more than one time leading to duplicated entry error
// https://github.com/dotnet/efcore/issues/30236
ImageInReply = existingImages.TryGetValue(t.Image.UrlFilename, out var e)
? e
: imagesKeyByUrlFilename[t.Image.UrlFilename]
: images[t.Image.UrlFilename]
}));

if (newlyLocked.Any(urlFilename => !LocksKeyByUrlFilename.TryRemove(urlFilename, out _)))
throw new InvalidOperationException();
#pragma warning disable IDISP007 // Don't dispose injected
locks.Dispose();
#pragma warning restore IDISP007 // Don't dispose injected
return () =>
{
try
{
if (newlyLockedImages.Any(pair =>
!GlobalLockedImagesInReplyKeyByUrlFilename.TryRemove(pair)))
throw new InvalidOperationException();
}
finally
{
newlyLockedImages.Values().ForEach(Monitor.Exit);
alreadyLockedImages.Values().ForEach(Monitor.Exit);
}
};
}
}
1 change: 1 addition & 0 deletions c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public void Save(
{
if (users.Count == 0) return;
var newlyLocked = _saverLocks.Value.Acquire(users.Keys().ToList());
if (newlyLocked.Count == 0) return;

// existingUsers may have new revisions to insert so excluding already locked users
// to prevent inserting duplicate revision
Expand Down

0 comments on commit 25ab42d

Please sign in to comment.