Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update locks #124

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 59 additions & 40 deletions src/NexusMods.MnemonicDB/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
/// </summary>
public class Connection : IConnection
{
private readonly ScopedAsyncLock _observerLock = new();
private readonly Lock _dbLock = new();
private readonly Lock _observersLock = new();

private readonly DatomStore _store;
private readonly ILogger<Connection> _logger;

Expand Down Expand Up @@ -85,19 +87,21 @@
_logger.LogError(ex, "Failed to analyze with {Analyzer}", analyzer.GetType().Name);
}
}

_dbStream.OnNext(db);

lock (_dbLock)
{
_dbStream.OnNext(db);
}

ProcessObservers(db);
prev = idb;


return idb;
});
}

private void ProcessObservers(Db db)
{
using var dLock = _observerLock.Lock();
ProcessDisposedObservers();
var recentlyAdded = db.RecentlyAdded;
var cache = db.AttributeCache;
Expand Down Expand Up @@ -157,15 +161,19 @@
foreach (var index in IndexTypes)
{
var reindex = datom.WithIndex(index);
foreach (var overlap in _datomObservers.Query(reindex))

lock (_observersLock)
{
ref var changeSet = ref CollectionsMarshal.GetValueRefOrAddDefault(_changeSets, overlap, out _);
changeSet ??= [];
changeSet.AddRange(_localChanges);
foreach (var overlap in _datomObservers.Query(reindex))
{
ref var changeSet = ref CollectionsMarshal.GetValueRefOrAddDefault(_changeSets, overlap, out _);
changeSet ??= [];
changeSet.AddRange(_localChanges);
}
}
}
}

// Release all the sends
foreach (var (subject, changeSet) in _changeSets)
{
Expand Down Expand Up @@ -267,38 +275,44 @@
return Transact(new SimpleMigration(attribute));
}

public IObservable<IChangeSet<Datom, DatomKey>> ObserveDatoms(SliceDescriptor descriptor)

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (ubuntu-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (ubuntu-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (windows-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (windows-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (macos-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (macos-latest)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (macos-13)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'

Check warning on line 278 in src/NexusMods.MnemonicDB/Connection.cs

View workflow job for this annotation

GitHub Actions / build-and-test / Build and Test (macos-13)

Missing XML comment for publicly visible type or member 'Connection.ObserveDatoms(SliceDescriptor)'
{
return Observable.Create<IChangeSet<Datom, DatomKey>>(async (observer, token) =>
return Observable.Create<IChangeSet<Datom, DatomKey>>(observer =>
{
using var _ = await _observerLock.LockAsync();
ProcessDisposedObservers();
var fromDatom = descriptor.From.WithIndex(descriptor.Index);
var toDatom = descriptor.To.WithIndex(descriptor.Index);
_datomObservers.Add(fromDatom, toDatom, observer);

var db = Db;
var datoms = db.Datoms(descriptor);
var cache = db.AttributeCache;
var changes = new ChangeSet<Datom, DatomKey>();

foreach (var datom in datoms)
lock (_dbLock)
{
var isMany = cache.IsCardinalityMany(datom.A);
changes.Add(new Change<Datom, DatomKey>(ChangeReason.Add, CreateKey(datom, datom.A, isMany), datom));
var db = Db;

ProcessDisposedObservers();
var fromDatom = descriptor.From.WithIndex(descriptor.Index);
var toDatom = descriptor.To.WithIndex(descriptor.Index);

var datoms = db.Datoms(descriptor);
var cache = db.AttributeCache;
var changes = new ChangeSet<Datom, DatomKey>();

foreach (var datom in datoms)
{
var isMany = cache.IsCardinalityMany(datom.A);
changes.Add(new Change<Datom, DatomKey>(ChangeReason.Add, CreateKey(datom, datom.A, isMany), datom));
}

if (changes.Count == 0)
observer.OnNext(ChangeSet<Datom, DatomKey>.Empty);
else
observer.OnNext(changes);

lock (_observersLock)
{
_datomObservers.Add(fromDatom, toDatom, observer);
}

return Disposable.Create((_observersPendingDisposal, observer), static state =>
{
var (observersPendingDisposal, observer) = state;
observersPendingDisposal.Enqueue(observer);
});
}

if (changes.Count == 0)
observer.OnNext(ChangeSet<Datom, DatomKey>.Empty);
else
observer.OnNext(changes);


return Disposable.Create((_observersPendingDisposal, observer), static state =>
{
var (observersPendingDisposal, observer) = state;
observersPendingDisposal.Enqueue(observer);
});
});
}

Expand All @@ -307,9 +321,14 @@
// Quick exit so we don't allocate an enumerator
if (_observersPendingDisposal.IsEmpty)
return;

foreach (var itm in _observersPendingDisposal)
_datomObservers.Remove(itm);

lock(_observersLock)
{
foreach (var itm in _observersPendingDisposal)
{
_datomObservers.Remove(itm);
}
}
}


Expand Down
Loading