Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page

Was this helpful?

Export as PDF

Last updated 2 months ago

Was this helpful?

Ecotone comes with inbuilt Event Sourcing repository after . However you want to roll out your own storage for Events, or maybe you already use some event-sourcing framework and would like to integrate with it. For this you can take over the control by introducing your own Event Sourcing Repository.

Using Custom Event Sourcing Repository will not allow you to make use of . Therefore consider configuring your own Event Sourcing Repository only if you want to build your own projecting system.

Custom Event Sourcing Repository

We do start by implementing EventSourcingRepository interface:

  1. canHandle - Tells whatever given Aggregate is handled by this Repository

  2. findBy - Method returns previously created events for given aggregate. Which Ecotone will use to reconstruct the Aggregate.

  3. save - Stores events recorded by Event Sourced Aggregate

and then we need to mark class which implements this interface as Repository

Storing Events

Ecotone provides enough information to decide how to store provided events.

Identifiers will hold array of identifiers related to given aggregate (e.g. ["orderId" ⇒ 123]). Events will be list of Ecotone's Event classes, which contains of payload and metadata, where payload is your Event class instance and metadata is specific to this event. Metadata as parameter is generic metadata available at the moment of Aggregate execution. Version before handling on other hand is the version of the Aggregate before any action was triggered on it. This can be used to protect from concurrency issues.

The structure of Events is as follows:

Core metadata

It's worth to mention about Ecotone's Events and especially about metadata part of the Event. Each metadata for given Event contains of three core Event attributes:

"_aggregate_id" - This provides aggregate identifier of related Aggregate

"_aggregate_version" - This provides version of the related Event (e.g. 1/2/3/4)

"_aggregate_type" - This provides type of the Aggregate being stored, which can be customized

Aggregate Type

If our repository stores multiple Aggregates is useful to have the information about the type of Aggregate we are storing. However keeping the class name is not best idea, as simply refactor would break our Event Stream. Therefore Ecotone provides a way to mark our Aggregate type using Attribute

This now will be passed together with Events under _aggregate_type metadata.

Named Events

In Ecotone we can name the events to avoid storing class names in the Event Stream, to do so we use NamedEvent.

then when events will be passed to save method, they will automatically provide this name under eventName property.

Snapshoting

Ecotone then after fetching snapshot, will load events only from this given moment using `fromAggregateVersion`.

Testing

If you want to test out your flow and storing with your custom Event Sourced Repository, you should disable default in memory repository

With custom repository we still can use inbuilt . To use it for customized repository we will use BaseEventSourcingConfiguration.

public function save(
    array $identifiers, 
    string $aggregateClassName, 
    array $events, 
    array $metadata, 
    int $versionBeforeHandling
): void;
class Event
{
    private function __construct(
        private string $eventName, // either class name or name of the event
        private object $payload, // event object instance
        private array $metadata // related metadata
    )
}
#[EventSourcingAggregate]
#[AggregateType("basket")]
class Basket
#[NamedEvent("order_was_placed")]
class OrderWasPlaced
#[ServiceContext]
public function configuration()
{
    return BaseEventSourcingConfiguration::withDefaults()
            ->withSnapshotsFor(Basket::class, thresholdTrigger: 100);
}
public function findBy(
    string $aggregateClassName, 
    array $identifiers, 
    int $fromAggregateVersion = 1
) 
$repository = new CustomEventSourcingRepository;
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
    [OrderAggregate::class, CustomEventSourcingRepository::class],
    [CustomEventSourcingRepository::class => $repository],
    addInMemoryEventSourcedRepository: false,
);

$ecotoneLite->sendCommand(new PlaceOrder());

$this->assertNotEmpty($repository->getEvents());
  1. Modelling
  2. Event Sourcing
  3. Event Sourcing Introduction
  4. Event Stream Persistence

Event Sourcing Repository

PreviousEvent Stream PersistenceNextMaking Stream immune to changes
  • Custom Event Sourcing Repository
  • Storing Events
  • Core metadata
  • Aggregate Type
  • Named Events
  • Snapshoting
  • Testing
interface EventSourcedRepository
{
    1. public function canHandle(string $aggregateClassName): bool;
    
    2. public function findBy(string $aggregateClassName, array $identifiers, int $fromAggregateVersion = 1) :  EventStream;

    3. public function save(array $identifiers, string $aggregateClassName, array $events, array $metadata, int $versionBeforeHandling): void;
}
#[Repository]
class CustomEventSourcingRepository
Event Sourcing package is installed
inbuilt projection system
Snapshoting mechanism