Working with Event Streams
Last updated
Last updated
In previous chapter we discussed that Event Sourcing Aggregates are built from Event Streams stored in the data store. Yet it's important to understand how those Events gets to the Event Stream in the first place.
Let's start by manually appending Events using Event Store. This will help us understand better the concepts behind the Event Stream and Event Partitioning. After we will understand this part, we will introduce Event Sourcing Aggregates, which will abstract away most of the logic that we will need to do in this chapter.
Working with Event Stream directly may be useful when migrating from existing system where we already had an Event Sourcing solution, which we want to refactor to Ecotone.
After installing Ecotone's Event Sourcing we automatically get access to Event Store abstraction. This abstraction provides an easy to work with Event Streams.
Let's suppose that we do have Ticketing System like Jira with two basic Events "Ticket Was Registered" and "Ticket Was Closed". Of course we need to identify to which Ticket given event is related, therefore will have some Id.
In our code we can define classes for those:
To store those in the Event Stream, let's first declare it using - Event Store abstraction.
Event Store is automatically available in your Dependency Container after installing Symfony or Laravel integration. In case of Ecotone Lite, it can be retrievied directly.
Event Store provides few handy methods:
As we want to append some Events, let's first create an new Event Stream
This is basically enough to create new Event Stream. But it's good to understand what actually happens under the hood.
In short Event Stream is just audit of series of Events. From the technical point it's a table in the Database. Therefore when we create an Event Stream we are actually creating new table.
Event Stream table contains:
Event Id - which is unique identifier for Event
Event Name - Is the named of stored Event, which is to know to which Class it should be deserialized to
Payload - is actual Event Class, which is serialized and stored in the database as JSON
Metadata - Is additional information stored alongside the Event
To append Events to the Event Stream we will use "appendTo" method
This will store given Event in Ticket's Event Stream
Above we've stored Events for Ticket with id "123". However we can store Events from different Tickets in the same Event Stream.
We now can load those Events from the Event Stream
This will return iterator of Ecotone's Events
As we can see this maps to what we've been storing in the Event Stream table.
Payload will contains our deserialized form of our event, so for example TicketWasRegistered
.
We could also fetch list of Events without deserializing them. $events = $eventStore->load("ticket", deserialize: false);
In that situations payload will contains an associative array. This may be useful when iterating over huge Event Streams, when there is no need to actually work with Objects. Besides that ability to load in batches may also be handy.
Let's consider what may actually happen during concurrent access to our System. This may be due more people working on same Ticket or simply because our system did allow for double clicking of the same action.
In those situations we may end up storing the same Event twice
Without any protection we will end up with Closing Events in the Event Stream. That's not really ideal, as we will end up with Event Stream having incorrect history:
This is the place where we need to get back to persistence strategy:
We've created this Stream with "simple" persistence strategy. This means we can apply any new Events without guards. This is fine in scenarios where we are dealing with no business logic involved like collecting metrics, statistics. where all we to do is to push push Events into the Event Stream, and duplicates are not really a problem. However simple strategy (which is often the only strategy in different Event Sourcing Frameworks), comes with cost:
We lose linear history of our Event Stream, as we allow for storing duplicates. This may lead to situations which may lead to incorrect state of the System, like Repayments being recorded twice.
As a result of duplicated Events (Which hold different Message Id) we will trigger side effects twice. Therefore our Event Handlers will need to handle this situation to avoid for example trigger requests to external system twice, or building wrong Read Model using Projections.
As we do allow for concurrent access, we can actually make wrong business decisions. For example we could give to the Customer promotion code twice.
The "simple strategy" is often the only strategy that different Event Sourcing Frameworks provide. However after the solution is released to the production, we often start to recognize above problems, yet now as we don't have other way of dealing with those, we are on mercy of fixing the causes, not the root of the problem. Therefore we need more sophisticated solution to this problem, to solve the cause of it not the side effects. And to solve the cause we will be using different persistence strategy called "partition strategy".
Event Stream can be split in partitions. Partition is just an sub-stream of Events related to given Identifier, in our context related to Ticket.
Partition is linear history for given identifier, where each Event is within partition is assigned with version. This way we now, which event is at which position. Therefore in order to partition the Stream, we need to know the partition key (in our case Ticket Id). By knowing the partition key and last version of given partition, we can apply an Event at the correct position. To create partitioned stream, we would create Event Stream with different strategy:
This will create Event Stream table with constraints, which will require:
Aggregate Id - This will be our partition key
Aggregate Type - This may be used if we would store more Aggregate types within same Stream (e.g. User), as additional partition key
Aggregate Version - This will ensure that we won't apply two Events at the same time to given partition
We append those as part of Event's metadata:
Let's now see, how does it help us ensuring that our history is always correct. Let's assume that currently we do have single Event in the partition
Now let's assume two requests happening at the same time:
This way allows us to be sure that within request we are dealing with latest Event Stream, because if that's not true we will end up in concurent exception. This kind of protection is crucial when dealing with business logic that depends on the previous events, as it ensures that there is no way to bypass it.