Skip to content

Commit

Permalink
TASK: Base correlation id on command class name
Browse files Browse the repository at this point in the history
... instead of encoding the command into `debug_causationCommand`
  • Loading branch information
mhsdesign committed Jan 27, 2025
1 parent 5b75ccb commit c522ffe
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 22 deletions.
28 changes: 8 additions & 20 deletions Neos.ContentRepository.Core/Classes/ContentRepository.php
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,11 @@ public function handle(CommandInterface $command): void
}

$toPublish = $this->commandBus->handle($command);
$correlationId = CorrelationId::fromString(sprintf('%s_%s', substr($command::class, strrpos($command::class, '\\') + 1, 20), bin2hex(random_bytes(9))));

// simple case
if ($toPublish instanceof EventsToPublish) {
$this->eventStore->commit($toPublish->streamName, Events::fromArray($this->enrichEventsWithInitiatingMetadata($toPublish->events)->map($this->eventNormalizer->normalize(...))), $toPublish->expectedVersion);
$this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, $correlationId), $toPublish->expectedVersion);
$fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it.
if ($fullCatchUpResult->hadErrors()) {
throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors);
Expand All @@ -109,13 +110,10 @@ public function handle(CommandInterface $command): void
}

// control-flow aware command handling via generator
$isFirstEvent = true;
$causationCommandClassName = $command::class;
$correlationId = CorrelationId::fromString(UuidFactory::create());
try {
foreach ($toPublish as $eventsToPublish) {
try {
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId, $isFirstEvent, $causationCommandClassName), $eventsToPublish->expectedVersion);
$this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion);
} catch (ConcurrencyException $concurrencyException) {
// we pass the exception into the generator (->throw), so it could be try-caught and reacted upon:
//
Expand All @@ -127,7 +125,7 @@ public function handle(CommandInterface $command): void
// }
$yieldedErrorStrategy = $toPublish->throw($concurrencyException);
if ($yieldedErrorStrategy instanceof EventsToPublish) {
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId, $isFirstEvent, $causationCommandClassName), $yieldedErrorStrategy->expectedVersion);
$this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion);
}
throw $concurrencyException;
}
Expand Down Expand Up @@ -212,29 +210,19 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface
return $this->contentDimensionSource;
}

private function enrichEventsWithInitiatingMetadata(DomainEvents $events): DomainEvents
private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId $correlationId): Events
{
$initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser();
$initiatingTimestamp = $this->clock->now();

return InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$eventsWithMetaData = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata(
$events,
$initiatingUserId,
$initiatingTimestamp
);
}

private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId, bool &$isFirstEvent, string $causationCommandClassName): Events
{
$events = $this->enrichEventsWithInitiatingMetadata($events);

return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId, $causationCommandClassName, &$isFirstEvent) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
if ($isFirstEvent) {
$metadata['debug_causationCommand'] = substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1);
$isFirstEvent = false;
}
$decoratedEvent = DecoratedEvent::create($event, metadata: $metadata, correlationId: $correlationId);
return Events::fromArray($eventsWithMetaData->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) {
$decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId);
return $this->eventNormalizer->normalize($decoratedEvent);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,14 +115,14 @@ public function fixError(StructureAdjustment $adjustment): void
assert($eventsToPublish instanceof EventsToPublish);

// set correlation id and add debug metadata
$correlationId = CorrelationId::fromString(UuidFactory::create());
$correlationId = CorrelationId::fromString(sprintf('StructureAdjustment_%s', bin2hex(random_bytes(9))));
$isFirstEvent = true;
$normalizedEvents = Events::fromArray($eventsToPublish->events->map(function (EventInterface|DecoratedEvent $event) use (
&$isFirstEvent, $correlationId, $adjustment
) {
$metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : [];
if ($isFirstEvent) {
$metadata['debug_structureAdjustment'] = mb_strimwidth($adjustment->render() , 0, 250, '');
$metadata['debug_reason'] = mb_strimwidth($adjustment->render() , 0, 250, '');
$isFirstEvent = false;
}
$decoratedEvent = DecoratedEvent::create(
Expand Down

0 comments on commit c522ffe

Please sign in to comment.