Ecotone
comes with Prooph Event Store integration. This is well known and stable solution providing event storage over databases like Postgres
, MySQL
or MariaDB
.
Ecotone
provides Event Sourced Aggregates, which are stored as series of events.
Thanks to that we will be able to have the whole history of what happened with specific Aggregate.
And build up projections, that targets specific view, in order to keep our code simple, even with really complicated queries. This divide Commands from Queries (CQRS).
Do you want to find out, how to apply Event Sourcing in your business? Try out InspectIO!
This module is in experimental stage. Gathering feedback about improvements and waiting for pdo event store becomes integrated with PHP 8. Right now fork of pdo-event-store is used.
composer require ecotone/pdo-event-sourcing
And go to DBAL Support in order to configure the connection.
There are two ways, we can make use of Event Sourced Aggregates.
This way of handling events does allow for pure functions. Writes are clearly separated from writes.
#[EventSourcedAggregate] // 1class Ticket{use WithAggregateVersioning; // 2​#[AggregateIdentifier] // 1private string $ticketId;private string $ticketType;​private function __construct() {}​#[CommandHandler] // 2public static function register(RegisterTicket $command) : array{return [new TicketWasRegistered($command->getTicketId(), $command->getTicketType())];}​#[CommandHandler] // 2public function close(CloseTicket $command) : array{return [new TicketWasClosed($this->ticketId)];}​#[AggregateFactory] // 3public static function restoreFrom(array $events) : self{$ticket = new Ticket();​foreach ($events as $event) {match (get_class($event)) {TicketWasRegistered::class => $ticket->applyTicketWasRegistered($event)};}​return $ticket;}​private function applyTicketWasRegistered(TicketWasRegistered $event) : void{$this->ticketId = $event->getTicketId();$this->ticketType = $event->getTicketType();}}
EventSourcedAggregate
and AggregateIdentifier
works exactly the same as State-Stored Aggregate.
Event Sourced Aggregate must provide version. You may leave it to Ecotone
using WithAggregateVersioning
or you can implement it yourself.
CommandHandler
for event sourcing returns events generated by specific method. This will be passed to the Repository
to be stored.
AggregateFactory
is static method responsible for reconstructing Aggregate
from previously created events. At least one event need to be handled in order to provide AggregateIdentifier
.
This way of handling events allow for similarity with State Stored Aggregates.
#[EventSourcedAggregate(true)] // 1class Basket{use WithAggregateEvents;use WithAggregateVersioning;​#[AggregateIdentifier]private string $id;​private function __construct() {}​#[CommandHandler] // 2public static function create(CreateBasket $command) : static{$basket = new static();$basket->recordThat(new BasketWasCreated($command->getId()));​return $basket;}​#[CommandHandler] // 2public function addProduct(AddProduct $command) : void{$this->recordThat(new ProductWasAddedToBasket($this->id, $command->getProductName()));}​#[AggregateFactory]public static function restoreFrom(array $events) : self{$basket = new Basket();​foreach ($events as $event) {match (get_class($event)) {BasketWasCreated::class => $basket->applyBasketWasCreated($event)};}​return $basket;}​private function applyBasketWasCreated(BasketWasCreated $basketWasCreated){$this->id = $basketWasCreated->getId();}}
In order to make use of alternative way of handling events, we need to set attribute to trueEventSourcedAggregate(true)
Command Handlers instead of returning events are acting the same as State Stored Aggregates.
All events which will be published using recordThat
will be passed to the Repository
to be stored.
​
Projections are way to build specific view from stored events. Projections can be added in any moment of the application lifetime and be built up from existing history, till the current time. This is powerful concept as it allow for building views quickly and discarding them without pain, when they are not longer needed.
#[Projection("inProgressTicketList", Ticket::class] // 1class InProgressTicketList{private Connection $connection;​public function __construct(Connection $connection){$this->connection = $connection;}​#[EventHandler] // 2public function addTicket(TicketWasRegistered $event, array $metadata) : void{$result = $this->connection->executeStatement(<<<SQLINSERT INTO in_progress_tickets VALUES (?,?)SQL, [$event->getTicketId(), $event->getTicketType()]);}​#[QueryHandler("getInProgressTickets")] // 3public function getTickets() : array{return $this->connection->executeQuery(<<<SQLSELECT * FROM in_progress_ticketsSQL)->fetchAllAssociative();}}
This tells Ecotone
that specific class is Projection. The first parameter is the name of the projection
and the second is name of the stream (the default is the name of the Aggregate
) that this projection subscribes to.
Events that this projection subscribes to
Optional Query Handlers for this projection. They can be placed in different classes depending on preferences.
bin/console ecotone:run {projectionName}
artisan ecotone:run {projectionName}
$messagingSystem->run($projectionName);
As projection can be restarted, deleted and created differently. It's easier to maintain, when the projection knows how to setup it itself, instead of depending on migrations.
Method with attribute #[ProjectionInitialization]
will be called on startup of the projection.
#[ProjectionInitialization]public function initialization() : void{$this->connection->executeStatement(<<<SQLCREATE TABLE IF NOT EXISTS in_progress_tickets (ticket_id VARCHAR(36) PRIMARY KEY,ticket_type VARCHAR(25))SQL);}
In order to restart the projection in case we want to provide incompatible change, we can simply reset the projection and it will build up from the beginning.
bin/console ecotone:es:reset-projection {projectionName}
artisan ecotone:es:reset-projection {projectionName}
$messagingSystem->runConsoleCommand("ecotone:es:reset-projection", ["name" => $projectionName]);
If the projection stores the state inside the Event Store, we can tell Ecotone
what to do with the projection:
#[ProjectionReset]public function reset() : void{$this->connection->executeStatement(<<<SQLDELETE FROM in_progress_ticketsSQL);}
If we want to delete the projection
bin/console ecotone:es:delete-projection {projectionName}
artisan ecotone:es:delete-projection {projectionName}
$messagingSystem->runConsoleCommand("ecotone:es:delete-projection", ["name" => $projectionName]);
If the projection stores the state inside the Event Store, we can tell Ecotone
what to do with the projection:
#[ProjectionDelete]public function delete() : void{$this->connection->executeStatement(<<<SQLDROP TABLE in_progress_ticketsSQL);}
Aggregate Repository
under the hood make use of Event Store.
As Event Store keeps the events in can also keep the state of the projection as simple array. This is useful if our events depend on each other in order to provide state between handlers or in case the result of the projection is singular record.
To be described...
To be described...
​