Skip to content

Commit

Permalink
Merge pull request #5016 from neos/feature/extend-event-normalizer
Browse files Browse the repository at this point in the history
TASK: Cleanup `EventNormalizer`
  • Loading branch information
bwaidelich committed May 1, 2024
2 parents 203fbe7 + 5ab793d commit ac2e2f4
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 79 deletions.
70 changes: 44 additions & 26 deletions Neos.ContentRepository.Core/Classes/EventStore/EventNormalizer.php
Expand Up @@ -43,6 +43,7 @@
use Neos\ContentRepository\Core\Feature\WorkspaceRebase\Event\WorkspaceWasRebased;
use Neos\EventStore\Model\Event;
use Neos\EventStore\Model\Event\EventData;
use Neos\EventStore\Model\Event\EventId;
use Neos\EventStore\Model\Event\EventType;

/**
Expand Down Expand Up @@ -117,32 +118,6 @@ public function __construct()
}
}

public function getEventData(EventInterface $event): EventData
{
try {
$eventDataAsJson = json_encode($event, JSON_THROW_ON_ERROR);
} catch (\JsonException $exception) {
throw new \InvalidArgumentException(
sprintf(
'Failed to normalize event of type "%s": %s',
get_debug_type($event),
$exception->getMessage()
),
1651838981
);
}
return EventData::fromString($eventDataAsJson);
}

public function getEventType(EventInterface $event): EventType
{
$className = get_class($event);

return $this->fullClassNameToShortEventType[$className] ?? throw new \RuntimeException(
'Event type ' . get_class($event) . ' not registered'
);
}

/**
* @return class-string<EventInterface>
*/
Expand All @@ -154,6 +129,23 @@ public function getEventClassName(Event $event): string
);
}

public function normalize(EventInterface|DecoratedEvent $event): Event
{
$eventId = $event instanceof DecoratedEvent && $event->eventId !== null ? $event->eventId : EventId::create();
$eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null;
$causationId = $event instanceof DecoratedEvent ? $event->causationId : null;
$correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null;
$event = $event instanceof DecoratedEvent ? $event->innerEvent : $event;
return new Event(
$eventId,
$this->getEventType($event),
$this->getEventData($event),
$eventMetadata,
$causationId,
$correlationId,
);
}

public function denormalize(Event $event): EventInterface
{
$eventClassName = $this->getEventClassName($event);
Expand All @@ -177,4 +169,30 @@ public function denormalize(Event $event): EventInterface
default => $eventInstance,
};
}

private function getEventData(EventInterface $event): EventData
{
try {
$eventDataAsJson = json_encode($event, JSON_THROW_ON_ERROR);
} catch (\JsonException $exception) {
throw new \InvalidArgumentException(
sprintf(
'Failed to normalize event of type "%s": %s',
get_debug_type($event),
$exception->getMessage()
),
1651838981
);
}
return EventData::fromString($eventDataAsJson);
}

private function getEventType(EventInterface $event): EventType
{
$className = get_class($event);

return $this->fullClassNameToShortEventType[$className] ?? throw new \RuntimeException(
'Event type ' . get_class($event) . ' not registered'
);
}
}
29 changes: 6 additions & 23 deletions Neos.ContentRepository.Core/Classes/EventStore/EventPersister.php
Expand Up @@ -21,13 +21,13 @@
*
* @internal
*/
final class EventPersister
final readonly class EventPersister
{
public function __construct(
private readonly EventStoreInterface $eventStore,
private readonly ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private readonly EventNormalizer $eventNormalizer,
private readonly Projections $projections,
private EventStoreInterface $eventStore,
private ProjectionCatchUpTriggerInterface $projectionCatchUpTrigger,
private EventNormalizer $eventNormalizer,
private Projections $projections,
) {
}

Expand All @@ -44,7 +44,7 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
// the following logic could also be done in an AppEventStore::commit method (being called
// directly from the individual Command Handlers).
$normalizedEvents = Events::fromArray(
$eventsToPublish->events->map(fn(EventInterface|DecoratedEvent $event) => $this->normalizeEvent($event))
$eventsToPublish->events->map($this->eventNormalizer->normalize(...))
);
$commitResult = $this->eventStore->commit(
$eventsToPublish->streamName,
Expand All @@ -70,21 +70,4 @@ public function publishEvents(EventsToPublish $eventsToPublish): CommandResult
// The CommandResult can be used to block until projections are up to date.
return new CommandResult($pendingProjections, $commitResult);
}

private function normalizeEvent(EventInterface|DecoratedEvent $event): Event
{
$eventId = $event instanceof DecoratedEvent && $event->eventId !== null ? $event->eventId : EventId::create();
$eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null;
$causationId = $event instanceof DecoratedEvent ? $event->causationId : null;
$correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null;
$event = $event instanceof DecoratedEvent ? $event->innerEvent : $event;
return new Event(
$eventId,
$this->eventNormalizer->getEventType($event),
$this->eventNormalizer->getEventData($event),
$eventMetadata,
$causationId,
$correlationId,
);
}
}
Expand Up @@ -113,14 +113,14 @@ public function run(): ProcessorResult
return ProcessorResult::error(sprintf('Failed to read events. %s is not expected in imported event stream.', $event->type));
}
$domainEvent = DecoratedEvent::create($domainEvent, eventId: EventId::fromString($event->identifier), metadata: $event->metadata);
$domainEvents[] = $this->normalizeEvent($domainEvent);
$domainEvents[] = $this->eventNormalizer->normalize($domainEvent);
}

assert($this->contentStreamId !== null);

$contentStreamStreamName = ContentStreamEventStreamName::fromContentStreamId($this->contentStreamId)->getEventStreamName();
$events = Events::with(
$this->normalizeEvent(
$this->eventNormalizer->normalize(
new ContentStreamWasCreated(
$this->contentStreamId,
)
Expand All @@ -135,7 +135,7 @@ public function run(): ProcessorResult
$workspaceName = WorkspaceName::forLive();
$workspaceStreamName = WorkspaceEventStreamName::fromWorkspaceName($workspaceName)->getEventStreamName();
$events = Events::with(
$this->normalizeEvent(
$this->eventNormalizer->normalize(
new RootWorkspaceWasCreated(
$workspaceName,
WorkspaceTitle::fromString('live workspace'),
Expand All @@ -158,29 +158,6 @@ public function run(): ProcessorResult
return ProcessorResult::success(sprintf('Imported %d event%s into stream "%s"', count($domainEvents), count($domainEvents) === 1 ? '' : 's', $contentStreamStreamName->value));
}

/**
* Copied from {@see EventPersister::normalizeEvent()}
*
* @param EventInterface|DecoratedEvent $event
* @return Event
*/
private function normalizeEvent(EventInterface|DecoratedEvent $event): Event
{
$eventId = $event instanceof DecoratedEvent && $event->eventId !== null ? $event->eventId : EventId::create();
$eventMetadata = $event instanceof DecoratedEvent ? $event->eventMetadata : null;
$causationId = $event instanceof DecoratedEvent ? $event->causationId : null;
$correlationId = $event instanceof DecoratedEvent ? $event->correlationId : null;
$event = $event instanceof DecoratedEvent ? $event->innerEvent : $event;
return new Event(
$eventId,
$this->eventNormalizer->getEventType($event),
$this->eventNormalizer->getEventData($event),
$eventMetadata,
$causationId,
$correlationId,
);
}

/** --------------------------- */

/**
Expand Down
Expand Up @@ -166,11 +166,12 @@ private function resetRuntimeState(): void

private function exportEvent(EventInterface $event): void
{
$normalizedEvent = $this->eventNormalizer->normalize($event);
$exportedEvent = new ExportedEvent(
Uuid::uuid4()->toString(),
$this->eventNormalizer->getEventType($event)->value,
json_decode($this->eventNormalizer->getEventData($event)->value, true),
[]
$normalizedEvent->id->value,
$normalizedEvent->type->value,
json_decode($normalizedEvent->data->value, true),
[],
);
assert($this->eventFileResource !== null);
fwrite($this->eventFileResource, $exportedEvent->toJson() . chr(10));
Expand Down

0 comments on commit ac2e2f4

Please sign in to comment.