Skip to content

Commit

Permalink
TASK: Add CorrelationId to removed events from cs pruner
Browse files Browse the repository at this point in the history
  • Loading branch information
mhsdesign committed Jan 27, 2025
1 parent 2e0c7b7 commit 2647207
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

namespace Neos\ContentRepository\Core\Service;

use Neos\ContentRepository\Core\EventStore\DecoratedEvent;
use Neos\ContentRepository\Core\EventStore\EventNormalizer;
use Neos\ContentRepository\Core\Factory\ContentRepositoryServiceInterface;
use Neos\ContentRepository\Core\Feature\ContentStreamCreation\Event\ContentStreamWasCreated;
Expand All @@ -22,6 +23,7 @@
use Neos\ContentRepository\Core\SharedModel\Workspace\ContentStreamId;
use Neos\ContentRepository\Core\Subscription\Engine\SubscriptionEngine;
use Neos\EventStore\EventStoreInterface;
use Neos\EventStore\Model\Event\CorrelationId;
use Neos\EventStore\Model\Event\EventType;
use Neos\EventStore\Model\Event\EventTypes;
use Neos\EventStore\Model\EventStream\EventStreamFilter;
Expand Down Expand Up @@ -129,6 +131,7 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta
{
$allContentStreams = $this->findAllContentStreams();

$correlationId = CorrelationId::fromString(sprintf('ContentStreamPruner_%s', bin2hex(random_bytes(9))));
$danglingContentStreamsPresent = false;
foreach ($allContentStreams as $contentStream) {
if (!$contentStream->isDangling()) {
Expand All @@ -145,8 +148,12 @@ public function removeDanglingContentStreams(\Closure $outputFn, \DateTimeImmuta
$this->eventStore->commit(
ContentStreamEventStreamName::fromContentStreamId($contentStream->id)->getEventStreamName(),
$this->eventNormalizer->normalize(
new ContentStreamWasRemoved(
$contentStream->id
DecoratedEvent::create(
new ContentStreamWasRemoved(
$contentStream->id
),
metadata: ['debug_reason' => sprintf('Removed dangling content stream with status %s', $contentStream->status->value)],
correlationId: $correlationId
)
),
ExpectedVersion::STREAM_EXISTS()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ public static function create(
ContentStreamId $id,
ContentStreamStatus $status,
?ContentStreamId $sourceContentStreamId,
\DateTimeImmutable $create,
\DateTimeImmutable $created,
): self {
return new self(
$id,
$status,
$sourceContentStreamId,
$create,
$created,
false
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,11 +134,7 @@ public function fixError(StructureAdjustment $adjustment): void
return $this->eventNormalizer->normalize($decoratedEvent);
}));

$this->eventStore->commit(
$eventsToPublish->streamName,
$normalizedEvents,
$eventsToPublish->expectedVersion
);
$this->eventStore->commit($eventsToPublish->streamName, $normalizedEvents, $eventsToPublish->expectedVersion);
$this->subscriptionEngine->catchUpActive();
}
}

0 comments on commit 2647207

Please sign in to comment.