Skip to content

Commit

Permalink
* rename field DiscoveredAt to LastSeenAt @ ThreadMissingFirstRep…
Browse files Browse the repository at this point in the history
…ly.cs

* now will always replace existing entity fields with new values even if it's null @ `CrawlPost.SaveThreadMissingFirstReply()`
* now will retry all savers, this is similar to `TbmDbContext.SaveChangesForUpdate()`
* fix commit transaction before emitting `BaseSaver.OnPostSaveEvent()` & `PostCommitSaveHook()`
@ `BaseCrawlFacade.SaveCrawled()`
* using `SaveChangesForUpdate()` to retry saving the title of the parent thread even if the thread is not saved before @ `ReplyCrawlFacade.SaveParentThreadTitle()`
* remove `IQueryable<>.ForUpdate()` in favor of `TbmDbContext.SaveChangesForUpdate()` @ `BasePostSaver.Save()`, `ReplySaver.SaveReply(Signatures|ContentImages)()`, `UserSaver.Save()` & `ForumModeratorRevisionCrawlWorker.Save()`
@ crawler

+ methods `SaveChangesForUpdate(Async)()` which similar to https://learn.microsoft.com/en-us/ef/core/saving/concurrency#resolving-concurrency-conflicts
- class `SelectForUpdateCommandInterceptor` and its static prop
@ TbmDbContext
- method `IQueryable<>.ForUpdate()` @ ExtensionMethods.cs
@ shared
@ c#
  • Loading branch information
n0099 committed May 10, 2024
1 parent d201f4c commit d48e6f0
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 99 deletions.
2 changes: 1 addition & 1 deletion c#/crawler/src/Db/ThreadMissingFirstReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ public class ThreadMissingFirstReply : RowVersionedEntity
[Key] public ulong Tid { get; set; }
public ulong? Pid { get; set; }
public byte[]? Excerpt { get; set; }
public uint? DiscoveredAt { get; set; }
public uint? LastSeenAt { get; set; }
}
33 changes: 16 additions & 17 deletions c#/crawler/src/Tieba/Crawl/CrawlPost.cs
Original file line number Diff line number Diff line change
Expand Up @@ -134,44 +134,43 @@ private Action<Exception> SaveThreadMissingFirstReply
(Fid fid, Tid tid, SavedThreadsList savedThreads) => ex =>
{
if (ex is not EmptyPostListException) return;
var parentThread = savedThreads
var thread = savedThreads
.SelectMany(c => c.AllAfter.Where(th => th.Tid == tid))
.FirstOrDefault();
if (parentThread == null) return;
if (thread == null) return;

var newEntity = new ThreadMissingFirstReply
{
Tid = tid,
Pid = parentThread.FirstReplyPid,
Excerpt = Helper.SerializedProtoBufWrapperOrNullIfEmpty(parentThread.FirstReplyExcerpt,
() => new ThreadAbstractWrapper {Value = {parentThread.FirstReplyExcerpt}}),
DiscoveredAt = Helper.GetNowTimestamp()
Pid = thread.FirstReplyPid,
Excerpt = Helper.SerializedProtoBufWrapperOrNullIfEmpty(thread.FirstReplyExcerpt,
() => new ThreadAbstractWrapper {Value = {thread.FirstReplyExcerpt}}),
LastSeenAt = Helper.GetNowTimestamp()
};
if (newEntity.Pid == null && newEntity.Excerpt == null) return; // skip if all fields are empty

using var dbFactory = dbContextFactory();
var db = dbFactory.Value(fid);
using var transaction = db.Database.BeginTransaction(IsolationLevel.ReadCommitted);
var firstReply =
from r in db.Replies.AsNoTracking()
where r.Pid == parentThread.FirstReplyPid
select r.Pid;
if (firstReply.Any()) return; // skip if the first reply of parent thread had already saved
if ((from r in db.Replies.AsNoTracking()
where r.Pid == thread.FirstReplyPid
select r.Pid)
.Any()) return; // skip if the first reply of parent thread had already saved

var existingEntity = db.ThreadMissingFirstReplies.AsTracking().ForUpdate()
var existingEntity = db.ThreadMissingFirstReplies.AsTracking()
.SingleOrDefault(e => e.Tid == tid);
if (existingEntity == null)
{
_ = db.ThreadMissingFirstReplies.Add(newEntity);
}
else
{
if (newEntity.Pid != null) existingEntity.Pid = newEntity.Pid;
if (newEntity.Excerpt != null) existingEntity.Excerpt = newEntity.Excerpt;
existingEntity.DiscoveredAt = newEntity.DiscoveredAt;
existingEntity.Pid = newEntity.Pid;
existingEntity.Excerpt = newEntity.Excerpt;
existingEntity.LastSeenAt = newEntity.LastSeenAt;
}

_ = db.SaveChanges();
_ = db.SaveChangesForUpdate();
transaction.Commit();
};
}
37 changes: 20 additions & 17 deletions c#/crawler/src/Tieba/Crawl/Facade/BaseCrawlFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,26 +46,29 @@ public virtual void Dispose()
var db = DbContextFactory(Fid);
using var transaction = db.Database.BeginTransaction(IsolationLevel.ReadCommitted);

var postSaver = postSaverFactory(Posts);
var savedPosts = Posts.IsEmpty ? null : postSaver.Save(db);
while (true)
{
var postSaver = postSaverFactory(Posts);
var savedPosts = Posts.IsEmpty ? null : postSaver.Save(db);

var userSaver = userSaverFactory(_users);
userSaver.Save(db, postSaver.CurrentPostType, postSaver.UserFieldChangeIgnorance);
var userSaver = userSaverFactory(_users);
userSaver.Save(db, postSaver.CurrentPostType, postSaver.UserFieldChangeIgnorance);

BeforeCommitSaveHook(db, userSaver);
try
{
db.TimestampingEntities();
_ = db.SaveChanges();
transaction.Commit();
if (savedPosts != null) PostCommitSaveHook(savedPosts, stoppingToken);
}
finally
{
postSaver.OnPostSaveEvent();
userSaver.PostSaveHook();
BeforeCommitSaveHook(db, userSaver);
try
{
db.TimestampingEntities();
_ = db.SaveChanges();
if (savedPosts != null) PostCommitSaveHook(savedPosts, stoppingToken);
postSaver.OnPostSaveEvent();
transaction.Commit();
}
finally
{
userSaver.PostSaveHook();
}
return savedPosts;
}
return savedPosts;
}

public async Task<BaseCrawlFacade<TPost, TBaseRevision, TResponse, TPostProtoBuf>>
Expand Down
31 changes: 17 additions & 14 deletions c#/crawler/src/Tieba/Crawl/Facade/ReplyCrawlFacade.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,25 +45,28 @@ join user in users on reply.AuthorUid equals user.Uid

private void SaveParentThreadTitle(IEnumerable<Reply> replies)
{
var newTitle = replies.FirstOrDefault(r => r.Floor == 1)?.Title;
if (newTitle == null) return;

// update the parent thread of reply with the new title extracted from the first-floor reply in the first page
var db = dbContextFactory(Fid);
using var transaction = db.Database.BeginTransaction(IsolationLevel.ReadCommitted);

var parentThreadTitle = (
from t in db.Threads.AsNoTracking().ForUpdate()
where t.Tid == tid
select t.Title).SingleOrDefault();

// thread title will be empty string as a fallback when the thread author haven't written title for this thread
if (parentThreadTitle != "") return;
var newTitle = replies.FirstOrDefault(r => r.Floor == 1)?.Title;
if (newTitle == null) return;
var thread = db.Threads.AsTracking().SingleOrDefault(t => t.Tid == tid);
if (thread?.Title == newTitle) return;
switch (thread)
{ // thread title will be empty string as a fallback when the thread author haven't written title for this thread
case {Title: not ""}:
return; // != null && Title != ""
case null:
_ = db.Add(new ThreadPost {Tid = tid, Title = newTitle});
break;
default:
thread.Title = newTitle;
break;
}

db.Attach(new ThreadPost {Tid = tid, Title = newTitle})
.Property(th => th.Title).IsModified = true;
if (db.SaveChanges() != 1) // do not touch UpdateAt field for the accuracy of time field in thread revisions
throw new DbUpdateException(
$"Parent thread title \"{newTitle}\" completion for tid {tid} has failed.");
_ = db.SaveChangesForUpdate();
transaction.Commit();
}
}
5 changes: 2 additions & 3 deletions c#/crawler/src/Tieba/Crawl/Saver/Post/BasePostSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,8 @@ protected SaverChangeSet<TPost> Save<TRevision>(
ExpressionStarter<TPost> existingPostPredicate)
where TRevision : class, IRevision
{
var dbSet = db.Set<TPost>().ForUpdate();

var existingPostsKeyById = dbSet.Where(existingPostPredicate).ToDictionary(postIdSelector);
var existingPostsKeyById = db.Set<TPost>()
.Where(existingPostPredicate).ToDictionary(postIdSelector);

// deep copy before entities get mutated by BaseSaver.SavePostsOrUsers()
var existingBeforeMerge = existingPostsKeyById.Select(pair => (TPost)pair.Value.Clone()).ToList();
Expand Down
6 changes: 3 additions & 3 deletions c#/crawler/src/Tieba/Crawl/Saver/Post/ReplySaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ from c in r.OriginalContents
var imagesKeyByUrlFilename = pidAndImageList.Select(t => t.Image)
.DistinctBy(image => image.UrlFilename).ToDictionary(image => image.UrlFilename);
var existingImages = (
from e in db.ImageInReplies
from e in db.ImageInReplies.AsTracking()
where imagesKeyByUrlFilename.Keys.Contains(e.UrlFilename)
select e)
.ForUpdate().ToDictionary(e => e.UrlFilename);
.ToDictionary(e => e.UrlFilename);
(from existing in existingImages.Values
where existing.ExpectedByteSize == 0 // randomly respond with 0
join newInContent in imagesKeyByUrlFilename.Values
Expand Down Expand Up @@ -140,7 +140,7 @@ private Action SaveReplySignatures(CrawlerDbContext db, IEnumerable<ReplyPost> r
var uniqueSignatures = signatures
.ConvertAll(s => new UniqueSignature(s.SignatureId, s.XxHash3));
var existingSignatures = (
from s in db.ReplySignatures.AsTracking().ForUpdate()
from s in db.ReplySignatures.AsTracking()
where uniqueSignatures.Select(us => us.Id).Contains(s.SignatureId)

// server side eval doesn't need ByteArrayEqualityComparer
Expand Down
2 changes: 1 addition & 1 deletion c#/crawler/src/Tieba/Crawl/Saver/UserSaver.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void Save(
_savedUsersId.AddRange(usersExceptLocked.Keys);
UserIdLocks.UnionWith(_savedUsersId);

var existingUsersKeyByUid = (from user in db.Users.AsTracking().ForUpdate()
var existingUsersKeyByUid = (from user in db.Users.AsTracking()
where usersExceptLocked.Keys.Contains(user.Uid)
select user).ToDictionary(u => u.Uid);
SavePostsOrUsers(db, userFieldChangeIgnorance,
Expand Down
2 changes: 1 addition & 1 deletion c#/crawler/src/Worker/ForumModeratorRevisionCrawlWorker.cs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ await db.ForumModeratorRevisions.AddRangeAsync(existingLatestRevisions
ModeratorTypes = "" // moderator only exists in DB means the user is no longer a moderator
}), stoppingToken);

_ = await db.SaveChangesAsync(stoppingToken);
_ = await db.SaveChangesForUpdateAsync(stoppingToken);
await transaction.CommitAsync(stoppingToken);
}
}
1 change: 0 additions & 1 deletion c#/shared/src/ExtensionMethods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ public static partial class ExtensionMethods
public static short RoundToShort(this float number) => (short)Math.Round(number);
public static ushort RoundToUshort(this float number) => (ushort)Math.Round(number);
public static ushort RoundToUshort(this double number) => (ushort)Math.Round(number);
public static IQueryable<T> ForUpdate<T>(this IQueryable<T> query) => query.TagWith("ForUpdate");

/// <see>https://stackoverflow.com/questions/13158121/how-to-add-a-range-of-items-to-an-ilist/33104162#33104162</see>
[System.Diagnostics.CodeAnalysis.SuppressMessage("StyleCop.CSharp.DocumentationRules", "SA1618:Generic type parameters should be documented")]
Expand Down
74 changes: 33 additions & 41 deletions c#/shared/src/TbmDbContext.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
using System.Data;
using System.Data.Common;
using System.Diagnostics.CodeAnalysis;
using Microsoft.EntityFrameworkCore.Diagnostics;
Expand All @@ -12,49 +11,43 @@ namespace tbm.Shared;

public abstract class TbmDbContext : DbContext
{
protected static readonly SelectForUpdateCommandInterceptor SelectForUpdateCommandInterceptorSingleton = new();

[SuppressMessage("Style", "CC0072:Remove Async termination when method is not asynchronous.", Justification = "https://github.com/code-cracker/code-cracker/issues/1086")]
protected sealed class SelectForUpdateCommandInterceptor : DbCommandInterceptor
{ // https://stackoverflow.com/questions/37984312/how-to-implement-select-for-update-in-ef-core/75086260#75086260
public override InterceptionResult<object> ScalarExecuting
(DbCommand command, CommandEventData eventData, InterceptionResult<object> result)
{
ManipulateCommand(command);
return result;
}

public override ValueTask<InterceptionResult<object>> ScalarExecutingAsync(
DbCommand command,
CommandEventData eventData,
InterceptionResult<object> result,
CancellationToken cancellationToken = default)
{
ManipulateCommand(command);
return new(result);
}

public override InterceptionResult<DbDataReader> ReaderExecuting
(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result)
{
ManipulateCommand(command);
return result;
}

public override ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(
DbCommand command,
CommandEventData eventData,
InterceptionResult<DbDataReader> result,
CancellationToken cancellationToken = default)
public int SaveChangesForUpdate()
{
while (true)
{
ManipulateCommand(command);
return new(result);
try
{
return SaveChanges();
}
catch (DbUpdateConcurrencyException e)
{
foreach (var entry in e.Entries)
{
var existing = entry.GetDatabaseValues();
if (existing == null) entry.State = EntityState.Added; // already deleted
else entry.OriginalValues.SetValues(existing);
}
}
}
}

private static void ManipulateCommand(IDbCommand command)
public async Task<int> SaveChangesForUpdateAsync(CancellationToken stoppingToken = default)
{
while (true)
{
if (command.CommandText.StartsWith("-- ForUpdate", StringComparison.Ordinal))
command.CommandText += " FOR NO KEY UPDATE";
try
{
return await SaveChangesAsync(stoppingToken);
}
catch (DbUpdateConcurrencyException e)
{
foreach (var entry in e.Entries)
{
var existing = await entry.GetDatabaseValuesAsync(stoppingToken);
if (existing == null) entry.State = EntityState.Added; // already deleted
else entry.OriginalValues.SetValues(existing);
}
}
}
}

Expand Down Expand Up @@ -107,7 +100,6 @@ protected override void OnConfiguring(DbContextOptionsBuilder options)
options.UseNpgsql(GetNpgsqlDataSource(Config.GetConnectionString("Main")).Value, OnConfiguringNpgsql)
.ReplaceService<IModelCacheKeyFactory, TModelCacheKeyFactory>()
.ReplaceService<IRelationalTransactionFactory, NoSavePointTransactionFactory>()
.AddInterceptors(SelectForUpdateCommandInterceptorSingleton)
.UseCamelCaseNamingConvention();

var dbSettings = Config.GetSection("DbSettings");
Expand Down

0 comments on commit d48e6f0

Please sign in to comment.