Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Working with Event Stream directly
  • Creating new Event Stream
  • What is the Event Stream actually
  • Appending Events to Event Stream
  • Concurrent Access
  • Partitioning Events

Was this helpful?

Export as PDF
  1. Modelling
  2. Event Sourcing
  3. Event Sourcing Introduction

Working with Event Streams

PreviousEvent Sourcing IntroductionNextEvent Sourcing Aggregates

Last updated 7 months ago

Was this helpful?

In we discussed that Event Sourcing Aggregates are built from Event Streams stored in the data store. Yet it's important to understand how those Events gets to the Event Stream in the first place.

Working with Event Stream directly

Let's start by manually appending Events using Event Store. This will help us understand better the concepts behind the Event Stream and Event Partitioning. After we will understand this part, we will introduce Event Sourcing Aggregates, which will abstract away most of the logic that we will need to do in this chapter.

Working with Event Stream directly may be useful when migrating from existing system where we already had an Event Sourcing solution, which we want to refactor to Ecotone.

Creating new Event Stream

After installing Ecotone's Event Sourcing we automatically get access to Event Store abstraction. This abstraction provides an easy to work with Event Streams.

Let's suppose that we do have Ticketing System like Jira with two basic Events "Ticket Was Registered" and "Ticket Was Closed". Of course we need to identify to which Ticket given event is related, therefore will have some Id.

In our code we can define classes for those:

final readonly class TicketWasRegistered
{
    public function __construct(
        public string $id, 
        public string $type
    ) {}
}

final readonly class TicketWasClosed
{
    public function __construct(
        public string $id, 
    ) {}
}

To store those in the Event Stream, let's first declare it using - Event Store abstraction.

Event Store is automatically available in your Dependency Container after installing Symfony or Laravel integration. In case of Ecotone Lite, it can be retrievied directly.

Event Store provides few handy methods:

interface EventStore
{
    /**
     * Creates new Stream with Metadata and appends events to it
     *
     * @param Event[]|object[] $streamEvents
     */
    public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void;
    /**
     * Appends events to existing Stream, or creates one and then appends events if it does not exists
     *
     * @param Event[]|object[] $streamEvents
     */
    public function appendTo(string $streamName, array $streamEvents): void;

    /**
     * @return Event[]
     */
    public function load(
        string $streamName,
        int $fromNumber = 1,
        int $count = null,
        MetadataMatcher $metadataMatcher = null,
        bool $deserialize = true
    ): iterable;
}

As we want to append some Events, let's first create an new Event Stream

$eventStore->create("ticket", streamMetadata: [
    "_persistence" => 'simple', // we will get back to that in later part of the section
]);

This is basically enough to create new Event Stream. But it's good to understand what actually happens under the hood.

What is the Event Stream actually

In short Event Stream is just audit of series of Events. From the technical point it's a table in the Database. Therefore when we create an Event Stream we are actually creating new table.

Event Stream table contains:

  • Event Id - which is unique identifier for Event

  • Event Name - Is the named of stored Event, which is to know to which Class it should be deserialized to

  • Payload - is actual Event Class, which is serialized and stored in the database as JSON

  • Metadata - Is additional information stored alongside the Event

Appending Events to Event Stream

To append Events to the Event Stream we will use "appendTo" method

$eventStore->appendTo(
    "ticket",
    [
        new TicketWasRegistered('123', 'critical'),
        new TicketWasClosed('123')
    ]
);

This will store given Event in Ticket's Event Stream

Above we've stored Events for Ticket with id "123". However we can store Events from different Tickets in the same Event Stream.

$eventStore->appendTo(
    "ticket",
    [
        new TicketWasRegistered('124', 'critical'),
    ]
);

We now can load those Events from the Event Stream

$events = $eventStore->load("ticket");

This will return iterator of Ecotone's Events

class Event
{
    private function __construct(
        private string $eventName,
        private object|array $payload,
        private array $metadata
    )
    
    (...)

As we can see this maps to what we've been storing in the Event Stream table. Payload will contains our deserialized form of our event, so for example TicketWasRegistered.

We could also fetch list of Events without deserializing them. $events = $eventStore->load("ticket", deserialize: false);

In that situations payload will contains an associative array. This may be useful when iterating over huge Event Streams, when there is no need to actually work with Objects. Besides that ability to load in batches may also be handy.

Concurrent Access

Let's consider what may actually happen during concurrent access to our System. This may be due more people working on same Ticket or simply because our system did allow for double clicking of the same action.

In those situations we may end up storing the same Event twice

// concurrent request 1

$eventStore->appendTo(
    "ticket",
    [
        new TicketWasClosed('123'),
    ]
);

// concurrent request 2
$eventStore->appendTo(
    "ticket",
    [
        new TicketWasClosed('123'),
    ]
);

Without any protection we will end up with Closing Events in the Event Stream. That's not really ideal, as we will end up with Event Stream having incorrect history:

This is the place where we need to get back to persistence strategy:

$eventStore->create("ticket", streamMetadata: [
    "_persistence" => 'simple'
]);

We've created this Stream with "simple" persistence strategy. This means we can apply any new Events without guards. This is fine in scenarios where we are dealing with no business logic involved like collecting metrics, statistics. where all we to do is to push push Events into the Event Stream, and duplicates are not really a problem. However simple strategy (which is often the only strategy in different Event Sourcing Frameworks), comes with cost:

  • We lose linear history of our Event Stream, as we allow for storing duplicates. This may lead to situations which may lead to incorrect state of the System, like Repayments being recorded twice.

  • As we do allow for concurrent access, we can actually make wrong business decisions. For example we could give to the Customer promotion code twice.

The "simple strategy" is often the only strategy that different Event Sourcing Frameworks provide. However after the solution is released to the production, we often start to recognize above problems, yet now as we don't have other way of dealing with those, we are on mercy of fixing the causes, not the root of the problem. Therefore we need more sophisticated solution to this problem, to solve the cause of it not the side effects. And to solve the cause we will be using different persistence strategy called "partition strategy".

Partitioning Events

Event Stream can be split in partitions. Partition is just an sub-stream of Events related to given Identifier, in our context related to Ticket.

Partition is linear history for given identifier, where each Event is within partition is assigned with version. This way we now, which event is at which position. Therefore in order to partition the Stream, we need to know the partition key (in our case Ticket Id). By knowing the partition key and last version of given partition, we can apply an Event at the correct position. To create partitioned stream, we would create Event Stream with different strategy:

$eventStore->create("ticket", streamMetadata: [
    "_persistence" => 'partition',
]);

This will create Event Stream table with constraints, which will require:

  • Aggregate Id - This will be our partition key

  • Aggregate Type - This may be used if we would store more Aggregate types within same Stream (e.g. User), as additional partition key

  • Aggregate Version - This will ensure that we won't apply two Events at the same time to given partition

We append those as part of Event's metadata:

$eventStore->appendTo(
    $streamName,
    [
        Event::create(
            new TicketWasRegistered('123', 'Johnny', 'alert'),
            metadata: [
                '_aggregate_id' => 1,
                '_aggregate_version' => 1,
                '_aggregate_type' => 'ticket',
            ]
        )
    ]
);

Let's now see, how does it help us ensuring that our history is always correct. Let's assume that currently we do have single Event in the partition

Now let's assume two requests happening at the same time:

This way allows us to be sure that within request we are dealing with latest Event Stream, because if that's not true we will end up in concurent exception. This kind of protection is crucial when dealing with business logic that depends on the previous events, as it ensures that there is no way to bypass it.

As a result of duplicated Events (Which hold different Message Id) we will trigger side effects twice. Therefore our Event Handlers will need to handle this situation to avoid for example trigger requests to external system twice, or building wrong Read Model using .

Projections
previous chapter
Ticket Events
Event Stream table
Two Events stored in the Event Stream
Ticket was closed is duplicated in the Event Stream
Ticket Event Stream partioned for each Ticket
First request will succeed as will be quicker to store at position 2
Second request will fail due to database constraint, as position two is already taken