Event Sourcing

Event Sourcing PHP

Ecotone comes with Prooph Event Store integration. This is well known and stable solution providing event storage over databases like Postgres, MySQL or MariaDB. Ecotone provides Event Sourced Aggregates, which are stored as series of events. Thanks to that we will be able to have the whole history of what happened with specific Aggregate. And build up projections, that targets specific view, in order to keep our code simple, even with really complicated queries. This divide Commands from Queries (CQRS).

Do you want to find out, how to apply Event Sourcing in your business? Try out InspectIO!

This module is in experimental stage. Gathering feedback about improvements and waiting for pdo event store becomes integrated with PHP 8. Right now fork of pdo-event-store is used.

Installation

composer require ecotone/pdo-event-sourcing

And go to DBAL Support in order to configure the connection.

Event Sourced Aggregate

There are two ways, we can make use of Event Sourced Aggregates.

Pure Event Sourced Aggregate

This way of handling events does allow for pure functions. Writes are clearly separated from writes.

#[EventSourcedAggregate] // 1
class Ticket
{
use WithAggregateVersioning; // 2
#[AggregateIdentifier] // 1
private string $ticketId;
private string $ticketType;
private function __construct() {}
#[CommandHandler] // 2
public static function register(RegisterTicket $command) : array
{
return [new TicketWasRegistered($command->getTicketId(), $command->getTicketType())];
}
#[CommandHandler] // 2
public function close(CloseTicket $command) : array
{
return [new TicketWasClosed($this->ticketId)];
}
#[AggregateFactory] // 3
public static function restoreFrom(array $events) : self
{
$ticket = new Ticket();
foreach ($events as $event) {
match (get_class($event)) {
TicketWasRegistered::class => $ticket->applyTicketWasRegistered($event)
};
}
return $ticket;
}
private function applyTicketWasRegistered(TicketWasRegistered $event) : void
{
$this->ticketId = $event->getTicketId();
$this->ticketType = $event->getTicketType();
}
}
  1. EventSourcedAggregate and AggregateIdentifier works exactly the same as State-Stored Aggregate.

  2. Event Sourced Aggregate must provide version. You may leave it to Ecotone using WithAggregateVersioning or you can implement it yourself.

  3. CommandHandlerfor event sourcing returns events generated by specific method. This will be passed to the Repository to be stored.

  4. AggregateFactory is static method responsible for reconstructing Aggregate from previously created events. At least one event need to be handled in order to provide AggregateIdentifier.

Internal Recorder Aggregate

This way of handling events allow for similarity with State Stored Aggregates.

#[EventSourcedAggregate(true)] // 1
class Basket
{
use WithAggregateEvents;
use WithAggregateVersioning;
#[AggregateIdentifier]
private string $id;
private function __construct() {}
#[CommandHandler] // 2
public static function create(CreateBasket $command) : static
{
$basket = new static();
$basket->recordThat(new BasketWasCreated($command->getId()));
return $basket;
}
#[CommandHandler] // 2
public function addProduct(AddProduct $command) : void
{
$this->recordThat(new ProductWasAddedToBasket($this->id, $command->getProductName()));
}
#[AggregateFactory]
public static function restoreFrom(array $events) : self
{
$basket = new Basket();
foreach ($events as $event) {
match (get_class($event)) {
BasketWasCreated::class => $basket->applyBasketWasCreated($event)
};
}
return $basket;
}
private function applyBasketWasCreated(BasketWasCreated $basketWasCreated)
{
$this->id = $basketWasCreated->getId();
}
}
  1. In order to make use of alternative way of handling events, we need to set attribute to trueEventSourcedAggregate(true)

  2. Command Handlers instead of returning events are acting the same as State Stored Aggregates. All events which will be published using recordThatwill be passed to the Repository to be stored.

Projections

Projections are way to build specific view from stored events. Projections can be added in any moment of the application lifetime and be built up from existing history, till the current time. This is powerful concept as it allow for building views quickly and discarding them without pain, when they are not longer needed.

#[Projection("inProgressTicketList", Ticket::class] // 1
class InProgressTicketList
{
private Connection $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
#[EventHandler] // 2
public function addTicket(TicketWasRegistered $event, array $metadata) : void
{
$result = $this->connection->executeStatement(<<<SQL
INSERT INTO in_progress_tickets VALUES (?,?)
SQL, [$event->getTicketId(), $event->getTicketType()]);
}
#[QueryHandler("getInProgressTickets")] // 3
public function getTickets() : array
{
return $this->connection->executeQuery(<<<SQL
SELECT * FROM in_progress_tickets
SQL)->fetchAllAssociative();
}
}
  1. This tells Ecotone that specific class is Projection. The first parameter is the name of the projection and the second is name of the stream (the default is the name of the Aggregate) that this projection subscribes to.

  2. Events that this projection subscribes to

  3. Optional Query Handlers for this projection. They can be placed in different classes depending on preferences.

Projection Actions

Running the projection

Symfony
Laravel
Lite
Symfony
bin/console ecotone:run {projectionName}
Laravel
artisan ecotone:run {projectionName}
Lite
$messagingSystem->run($projectionName);

Projection initialization

As projection can be restarted, deleted and created differently. It's easier to maintain, when the projection knows how to setup it itself, instead of depending on migrations. Method with attribute #[ProjectionInitialization] will be called on startup of the projection.

#[ProjectionInitialization]
public function initialization() : void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS in_progress_tickets (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25)
)
SQL);
}

Resetting the projection

In order to restart the projection in case we want to provide incompatible change, we can simply reset the projection and it will build up from the beginning.

Symfony
Laravel
Lite
Symfony
bin/console ecotone:es:reset-projection {projectionName}
Laravel
artisan ecotone:es:reset-projection {projectionName}
Lite
$messagingSystem->runConsoleCommand("ecotone:es:reset-projection", ["name" => $projectionName]);

If the projection stores the state inside the Event Store, we can tell Ecotone what to do with the projection:

#[ProjectionReset]
public function reset() : void
{
$this->connection->executeStatement(<<<SQL
DELETE FROM in_progress_tickets
SQL);
}

Deleting the projection

If we want to delete the projection

Symfony
Laravel
Lite
Symfony
bin/console ecotone:es:delete-projection {projectionName}
Laravel
artisan ecotone:es:delete-projection {projectionName}
Lite
$messagingSystem->runConsoleCommand("ecotone:es:delete-projection", ["name" => $projectionName]);

If the projection stores the state inside the Event Store, we can tell Ecotone what to do with the projection:

#[ProjectionDelete]
public function delete() : void
{
$this->connection->executeStatement(<<<SQL
DROP TABLE in_progress_tickets
SQL);
}

Keep The State In Event Store

Aggregate Repository under the hood make use of Event Store. As Event Store keeps the events in can also keep the state of the projection as simple array. This is useful if our events depend on each other in order to provide state between handlers or in case the result of the projection is singular record.

Custom Stream Name

To be described...

Storing and handling events by names

To be described...