diff --git a/Neos.ContentRepository.Core/Classes/ContentRepository.php b/Neos.ContentRepository.Core/Classes/ContentRepository.php index 8f60277680..e1c72e5a82 100644 --- a/Neos.ContentRepository.Core/Classes/ContentRepository.php +++ b/Neos.ContentRepository.Core/Classes/ContentRepository.php @@ -102,7 +102,7 @@ public function handle(CommandInterface $command): void // simple case if ($toPublish instanceof EventsToPublish) { - $this->eventStore->commit($toPublish->streamName, $this->enrichAndNormalizeEvents($toPublish->events, correlationId: null), $toPublish->expectedVersion); + $this->eventStore->commit($toPublish->streamName, Events::fromArray($this->enrichEventsWithInitiatingMetadata($toPublish->events)->map($this->eventNormalizer->normalize(...))), $toPublish->expectedVersion); $fullCatchUpResult = $this->subscriptionEngine->catchUpActive(); // NOTE: we don't batch here, to ensure the catchup is run completely and any errors don't stop it. if ($fullCatchUpResult->hadErrors()) { throw CatchUpHadErrors::createFromErrors($fullCatchUpResult->errors); @@ -111,11 +111,13 @@ public function handle(CommandInterface $command): void } // control-flow aware command handling via generator + $isFirstEvent = true; + $causationCommandClassName = $command::class; $correlationId = CorrelationId::fromString(UuidFactory::create()); try { foreach ($toPublish as $eventsToPublish) { try { - $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId), $eventsToPublish->expectedVersion); + $this->eventStore->commit($eventsToPublish->streamName, $this->enrichAndNormalizeEvents($eventsToPublish->events, $correlationId, $isFirstEvent, $causationCommandClassName), $eventsToPublish->expectedVersion); } catch (ConcurrencyException $concurrencyException) { // we pass the exception into the generator (->throw), so it could be try-caught and reacted upon: // @@ -127,7 +129,7 @@ public function handle(CommandInterface $command): void // } $yieldedErrorStrategy = $toPublish->throw($concurrencyException); if ($yieldedErrorStrategy instanceof EventsToPublish) { - $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId), $yieldedErrorStrategy->expectedVersion); + $this->eventStore->commit($yieldedErrorStrategy->streamName, $this->enrichAndNormalizeEvents($yieldedErrorStrategy->events, $correlationId, $isFirstEvent, $causationCommandClassName), $yieldedErrorStrategy->expectedVersion); } throw $concurrencyException; } @@ -222,19 +224,29 @@ public function getContentDimensionSource(): ContentDimensionSourceInterface return $this->contentDimensionSource; } - private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId): Events + private function enrichEventsWithInitiatingMetadata(DomainEvents $events): DomainEvents { $initiatingUserId = $this->authProvider->getAuthenticatedUserId() ?? UserId::forSystemUser(); $initiatingTimestamp = $this->clock->now(); - $events = InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( + return InitiatingEventMetadata::enrichEventsWithInitiatingMetadata( $events, $initiatingUserId, $initiatingTimestamp ); + } + + private function enrichAndNormalizeEvents(DomainEvents $events, CorrelationId|null $correlationId, bool &$isFirstEvent, string $causationCommandClassName): Events + { + $events = $this->enrichEventsWithInitiatingMetadata($events); - return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId) { - $decoratedEvent = DecoratedEvent::create($event, correlationId: $correlationId); + return Events::fromArray($events->map(function (EventInterface|DecoratedEvent $event) use ($correlationId, $causationCommandClassName, &$isFirstEvent) { + $metadata = $event instanceof DecoratedEvent ? $event->eventMetadata?->value ?? [] : []; + if ($isFirstEvent) { + $metadata['debug_causationCommand'] = substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1); + $isFirstEvent = false; + } + $decoratedEvent = DecoratedEvent::create($event, metadata: $metadata, correlationId: $correlationId); return $this->eventNormalizer->normalize($decoratedEvent); })); } diff --git a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php index 053352f98e..8432cd74c3 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php +++ b/Neos.ContentRepository.Core/Classes/Feature/ContentStreamHandling.php @@ -31,19 +31,15 @@ trait ContentStreamHandling private function closeContentStream( ContentStreamId $contentStreamId, Version $contentStreamVersion, - string $causationCommandClassName ): EventsToPublish { $streamName = ContentStreamEventStreamName::fromContentStreamId($contentStreamId)->getEventStreamName(); return new EventsToPublish( $streamName, Events::with( - DecoratedEvent::create( - new ContentStreamWasClosed( - $contentStreamId, - ), - metadata: array_filter(['debug_causationCommand' => substr($causationCommandClassName, strrpos($causationCommandClassName, '\\') + 1)]) - ) + new ContentStreamWasClosed( + $contentStreamId, + ), ), ExpectedVersion::fromVersion($contentStreamVersion) ); diff --git a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php index b309aa34cf..7a655d09b6 100644 --- a/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php +++ b/Neos.ContentRepository.Core/Classes/Feature/WorkspaceCommandHandler.php @@ -207,8 +207,7 @@ private function handlePublishWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -354,8 +353,7 @@ private function handleRebaseWorkspace( // if we have no changes in the workspace we can fork from the base directly yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); yield from $this->rebaseWorkspaceWithoutChanges( @@ -376,8 +374,7 @@ private function handleRebaseWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -463,8 +460,7 @@ private function handlePublishIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); $commandSimulator = $this->commandSimulatorFactory->createSimulatorForWorkspace($baseWorkspace->workspaceName); @@ -588,8 +584,7 @@ private function handleDiscardIndividualNodesFromWorkspace( yield $this->closeContentStream( $workspace->currentContentStreamId, - $workspaceContentStreamVersion, - $command::class + $workspaceContentStreamVersion ); if ($commandsToKeep->isEmpty()) {