Event Sourcing

Event Sourcing PHP

Ecotone comes with Prooph Event Store integration. This is well known and stable solution providing event storage over databases like Postgres, MySQL or MariaDB. Ecotone provides Event Sourced Aggregates, which are stored as series of events. Thanks to that we will be able to have the whole history of what happened with specific Aggregate. And build up projections, that targets specific view, in order to keep our code simple, even with really complicated queries. This divide Commands from Queries (CQRS).

Do you want to find out, how to apply Event Sourcing in your business? Try out InspectIO!

Installation

composer require ecotone/pdo-event-sourcing

And go to DBAL Support in order to configure the connection.

Ecotone Event Sourcing is based on well known and stable Prooph's Event Store. It does provide support for three databases: - PostgreSQL - MySQL - MariaDB

Event Sourced Aggregate

There are two ways, we can make use of Event Sourced Aggregates.

Pure Event Sourced Aggregate

This way of handling events does allow for pure functions. Writes are clearly separated from writes.

#[EventSourcingAggregate] // 1
class Ticket
{
use WithAggregateVersioning; // 2
#[AggregateIdentifier] // 1
private string $ticketId;
private string $ticketType;
#[CommandHandler] // 2
public static function register(RegisterTicket $command) : array
{
return [new TicketWasRegistered($command->getTicketId(), $command->getTicketType())];
}
#[CommandHandler] // 2
public function close(CloseTicket $command) : array
{
return [new TicketWasClosed($this->ticketId)];
}
#[EventSourcingHandler] // 4
public function applyTicketWasRegistered(TicketWasRegistered $event) : void
{
$this->ticketId = $event->getTicketId();
$this->ticketType = $event->getTicketType();
}
}
  1. EventSourcingAggregate and AggregateIdentifier works exactly the same as State-Stored Aggregate.

  2. Event Sourced Aggregate must provide version. You may leave it to Ecotone using WithAggregateVersioning or you can implement it yourself.

  3. CommandHandlerfor event sourcing returns events generated by specific method. This will be passed to the Repository to be stored.

  4. EventSourcingHandler is method responsible for reconstructing Aggregate from previously created events. At least one event need to be handled in order to provide AggregateIdentifier.

Internal Recorder Aggregate

This way of handling events allow for similarity with State Stored Aggregates.

#[EventSourcingAggregate(true)] // 1
class Basket
{
use WithAggregateEvents;
use WithAggregateVersioning;
#[AggregateIdentifier]
private string $id;
#[CommandHandler] // 2
public static function create(CreateBasket $command) : static
{
$basket = new static();
$basket->recordThat(new BasketWasCreated($command->getId()));
return $basket;
}
#[CommandHandler] // 2
public function addProduct(AddProduct $command) : void
{
$this->recordThat(new ProductWasAddedToBasket($this->id, $command->getProductName()));
}
#[EventSourcingHandler]
public function applyBasketWasCreated(BasketWasCreated $basketWasCreated)
{
$this->id = $basketWasCreated->getId();
}
}
  1. In order to make use of alternative way of handling events, we need to set attribute to trueEventSourcingAggregate(true)

  2. Command Handlers instead of returning events are acting the same as State Stored Aggregates. All events which will be published using recordThatwill be passed to the Repository to be stored.

Projections

Projections are about deriving current state from the stream of events. Projections can be added in any moment of the application lifetime and be built up from existing history, till the current time. This is powerful concept as it allow for building views quickly and discarding them without pain, when they are not longer needed.

#[Projection("inProgressTicketList", Ticket::class] // 1
class InProgressTicketList
{
private Connection $connection;
public function __construct(Connection $connection)
{
$this->connection = $connection;
}
#[EventHandler] // 2
public function addTicket(TicketWasRegistered $event, array $metadata) : void
{
$result = $this->connection->executeStatement(<<<SQL
INSERT INTO in_progress_tickets VALUES (?,?)
SQL, [$event->getTicketId(), $event->getTicketType()]);
}
#[QueryHandler("getInProgressTickets")] // 3
public function getTickets() : array
{
return $this->connection->executeQuery(<<<SQL
SELECT * FROM in_progress_tickets
SQL)->fetchAllAssociative();
}
}
  1. This tells Ecotone that specific class is Projection. The first parameter is the name of the projection and the second is name of the stream (the default is the name of the Aggregate) that this projection subscribes to.

  2. Events that this projection subscribes to

  3. Optional Query Handlers for this projection. They can be placed in different classes depending on preference.

Running Projection

Synchronously Event Driven Projection

By default Ecotone runs the projections synchronously. There is no additional configuration needed for this. This kind of running configuration can be used to avoid eventual consistency or for testing purposes. However when you expect concurrent access to your Aggregates, you may consider using different approach to limit the time of your database transaction to minimum.

Polling Projection

You may run your projection in the background. It will query the database within constant time intervals, to look if new events have been registered. Each projection is running as separate process. To register Polling Projection make use of ServiceContext.

#[ServiceContext]
public function basketList()
{
return ProjectionRunningConfiguration::createPolling("basket_list");
}

After setting up Pollable Channel we can run the endpoint:

Symfony
Laravel
Lite
Symfony
bin/console ecotone:run basket_list -vvv
Laravel
artisan ecotone:run basket_list -vvv
Lite
$messagingSystem->run("basket_list");

Asynchronously Event Driven Projection

You may pass your projections in event driven manner using asynchronous channels.

#[Asynchronous("asynchronous_projections")]
#[Projection("basket_list")]
class BasketList

The difference between Polling and Event Driven projection is the way they are triggered. The Event Driven is only triggered when new event comes to the system. This avoid the pitfall of continues database access while using Polling Projection. The second strength of Asynchronously Event Driven Projection is possibility of registering multiple projections under same channel (which is same consumer).

Consider using Dbal Message Channels to avoid 2PC transactions problems. Thanks to that Ecotone will store events and publish them within same transaction. The solution is based on Outbox Pattern.

With Event Driven projections, your projections will not be created on startup, as they will be called when event happens. If this is problem, you may consider starting with Polling Projection and then switching to Event Driven Projection.

Projection Actions

Projection initialization

As projection can be restarted, deleted and created differently. It's easier to maintain, when the projection knows how to setup it itself, instead of depending on migrations. Method with attribute #[ProjectionInitialization] will be called on startup of the projection.

#[ProjectionInitialization]
public function initialization() : void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS in_progress_tickets (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25)
)
SQL);
}

Resetting the projection

In order to restart the projection in case we want to provide incompatible change, we can simply reset the projection and it will build up from the beginning.

Symfony
Laravel
Lite
Symfony
bin/console ecotone:es:reset-projection {projectionName}
Laravel
artisan ecotone:es:reset-projection {projectionName}
Lite
$messagingSystem->runConsoleCommand("ecotone:es:reset-projection", ["name" => $projectionName]);

And inside the projection we need to implement ProjectionReset to tell Ecotone what to do:

#[ProjectionReset]
public function reset() : void
{
$this->connection->executeStatement(<<<SQL
DELETE FROM in_progress_tickets
SQL);
}

Deleting the projection

If we want to delete the projection

Symfony
Laravel
Lite
Symfony
bin/console ecotone:es:delete-projection {projectionName}
Laravel
artisan ecotone:es:delete-projection {projectionName}
Lite
$messagingSystem->runConsoleCommand("ecotone:es:delete-projection", ["name" => $projectionName]);

And inside the projection we need to implement ProjectionDelete to tell Ecotone what to do:

#[ProjectionDelete]
public function delete() : void
{
$this->connection->executeStatement(<<<SQL
DROP TABLE in_progress_tickets
SQL);
}

Choosing Event Streams

The Projection is deriving from Event Stream. There may be situations when we will want to derive the projection from multiple streams however. Let's see what options do we have:

From Single Stream

If we are interested in single stream, we can listen directly for specific aggregate

#[Projection("basketList", Basket::class)]
class BasketList
{
#[EventHandler]
public function addBasket(BasketWasCreated $event) : void
{
// do something
}
}

In here we are handling events from single Basket's Aggregate stream. It will contain all the events in relation to this aggregate.

From Multiple Streams

There may be situations, when we will want to handle different streams together.

#[Projection("log_projection", [Ticket::class, Basket::class])]
class Logger

From Category

In case if using Stream Per Aggregate Persistence Strategy we will need to use categories to target. If we would listen on Domain\Ticket stream using Stream Per Aggregate then we would not target any event, as the streams that are created are suffixed by the identifier Domain\Ticket-123.

In that case we can make use of categories in order to target Domain\Ticket-*.

#[Projection("category_projection", fromCategories: Ticket::class)]
class FromCategoryUsingAggregatePerStreamProjection

Storing And Handling Events By Names

If you want to avoid storing class names of your events in the Event Store you may mark them with name.

#[NamedEvent("basket.was_created")]
class BasketWasCreated
{
public const EVENT_NAME = "basket.was_created";
private string $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}

And tell the projection to make use of it

#[Projection("basket_list")]
class BasketList
{
#[EventHandler("basket.was_created")]
public function addBasket(BasketWasCreated $event) : void
{
// do something with $event
}

Speeding Up Projection Restore

If projections are handling the events by names, then there is no need to deserialization of the event to the class and simple array can be used. In case of thousands of events during resetting the projection it will speed up the process.

#[EventHandler("basket.was_created")]
public function addBasket(array $event) : void
{
// do something with $event
}

Event Sourcing Configuration

To change Event Sourcing configuration we will use Service Context.

Persistence Strategy

Single Stream Strategy

The default persistence strategy is Single Stream Strategy. This persistence stores all instances of specific aggregate, within same stream.

#[ServiceContext]
public function persistenceStrategy()
{
return \Ecotone\EventSourcing\EventSourcingConfiguration::createWithDefaults()
->withSingleStreamPersistenceStrategy();
}
namespace Domain\Ticket;
#[EventSourcingAggregate]
class Ticket

All instances of Ticket will be stored within Domain\Ticket\Ticket stream.

Stream Per Aggregate Strategy

This persistence creates stream per aggregate instance.

#[ServiceContext]
public function persistenceStrategy()
{
return \Ecotone\EventSourcing\EventSourcingConfiguration::createWithDefaults()
->withStreamPerAggregatePersistenceStrategy();
}
namespace Domain\Ticket;
#[EventSourcingAggregate]
class Ticket

Instances of Ticket will be stored within Domain\Ticket\Ticket-{ticketId} stream where ticketId is identifier of specific aggregate.

Custom Stream Name

If you want to make use of custom stream name (default is Aggregate class name), then you can apply Stream attribute to your aggregate.

#[Stream("basket_stream")]
class Basket

Then tell the projection to make use of it:

#[Projection(self::PROJECTION_NAME, "basket_stream")]
class BasketList