Skip to content

Commit

Permalink
Fine grained corrections on identity map behavior with FetchForWriting
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremydmiller committed Sep 5, 2024
1 parent 80d10be commit 57a1ae0
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 11 deletions.
5 changes: 4 additions & 1 deletion src/EventSourcingTests/Bugs/Bug_fetch_for_writing_cache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using Marten.Testing.Harness;
using System;
using System.Threading.Tasks;
using Shouldly;
using Xunit;

namespace EventSourcingTests.Bugs;
Expand All @@ -29,12 +30,14 @@ public async Task Test()
await theSession.SaveChangesAsync();

var test = await theSession.Events.FetchForWriting<TestAggregate>(streamKey);
test.Aggregate.Name.ShouldBe("foo");

test.AppendOne(new NamedEvent2("bar"));
//await theSession.Events.AppendOptimistic(streamKey, new NamedEvent2("bar")); If I commented the two lines above and uncommented this one it works fine
await theSession.SaveChangesAsync();

test = await theSession.Events.FetchForWriting<TestAggregate>(streamKey);
Assert.Equal("bar", test.Aggregate.Name);
test.Aggregate.Name.ShouldBe("bar");
}

}
Expand Down
34 changes: 25 additions & 9 deletions src/Marten/Events/Fetching/FetchInlinedPlan.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,31 @@ internal class FetchInlinedPlan<TDoc, TId>: IAggregateFetchPlan<TDoc, TId> where
{
private readonly EventGraph _events;
private readonly IEventIdentityStrategy<TId> _identityStrategy;
private readonly IDocumentStorage<TDoc, TId> _storage;

internal FetchInlinedPlan(EventGraph events, IEventIdentityStrategy<TId> identityStrategy,
IDocumentStorage<TDoc, TId> storage)
internal FetchInlinedPlan(EventGraph events, IEventIdentityStrategy<TId> identityStrategy)
{
_events = events;
_identityStrategy = identityStrategy;
_storage = storage;
}

public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id, bool forUpdate,
CancellationToken cancellation = default)
{
await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);

IDocumentStorage<TDoc, TId> storage = null;
if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
storage = (IDocumentStorage<TDoc, TId>)session.Options.Providers.StorageFor<TDoc>().IdentityMap;
// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();
}
else
{
storage = session.StorageFor<TDoc, TId>();
}

await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);

if (forUpdate)
{
Expand All @@ -47,7 +50,7 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio

builder.StartNewCommand();

var handler = new LoadByIdHandler<TDoc, TId>(_storage, id);
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
handler.ConfigureCommand(builder, session);

long version = 0;
Expand Down Expand Up @@ -86,6 +89,19 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio
public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase session, TId id,
long expectedStartingVersion, CancellationToken cancellation = default)
{
IDocumentStorage<TDoc, TId> storage = null;
if (session.Options.Events.UseIdentityMapForInlineAggregates)
{
storage = (IDocumentStorage<TDoc, TId>)session.Options.Providers.StorageFor<TDoc>();
// Opt into the identity map mechanics for this aggregate type just in case
// you're using a lightweight session
session.UseIdentityMapFor<TDoc>();
}
else
{
storage = session.StorageFor<TDoc, TId>();
}

await _identityStrategy.EnsureEventStorageExists<TDoc>(session, cancellation).ConfigureAwait(false);
await session.Database.EnsureStorageExistsAsync(typeof(TDoc), cancellation).ConfigureAwait(false);

Expand All @@ -95,7 +111,7 @@ public async Task<IEventStream<TDoc>> FetchForWriting(DocumentSessionBase sessio

builder.StartNewCommand();

var handler = new LoadByIdHandler<TDoc, TId>(_storage, id);
var handler = new LoadByIdHandler<TDoc, TId>(storage, id);
handler.ConfigureCommand(builder, session);

long version = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/Marten/Events/Fetching/InlineFetchPlanner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ public bool TryMatch<TDoc, TId>(IDocumentStorage<TDoc, TId> storage, IEventIdent
{
if (projection.Lifecycle == ProjectionLifecycle.Inline)
{
plan = new FetchInlinedPlan<TDoc, TId>(options.EventGraph, identity, storage);
plan = new FetchInlinedPlan<TDoc, TId>(options.EventGraph, identity);
return true;
}
}
Expand Down

0 comments on commit 57a1ae0

Please sign in to comment.