Your service classes mix reading and writing. A single change to how orders are placed breaks the order listing page. Business rules are scattered across controllers, listeners, and services — there's no clear boundary between "what changes state" and "what reads state."
How Ecotone Solves It
Ecotone introduces Command Handlers for state changes, Query Handlers for reads, and Event Handlers for reactions. Each has a single responsibility, wired automatically through PHP attributes. No base classes, no framework coupling — just clear separation of concerns on top of your existing Laravel or Symfony application.
In this chapter we will cover process of handling and dispatching Messages with Ecotone.
We will discuss topics like Commands, Events and Queries, Message Handlers, Message Buses, Aggregates and Sagas.
You may be interested in theory - chapter first.
External Query Handlers are Services available in your dependency container, which are defined to handle Queries.
class TicketService
{
#[QueryHandler]
public function getTicket(GetTicketById $query) : array
{
//return ticket
}
}
Queries are Plain Old PHP Objects:
To send an Query we will be using send method on QueryBus.
Query will be delivered to corresponding Query Handler.
Sending with Routing
Just like with Commands, we may use routing in order to execute queries:
To send an Query we will be using sendWithRouting method on QueryBus.
Query will be delivered to corresponding Query Handler.
Converting result from Query Handler
If you have registered for specific Media Type, then you can tell Ecotone to convert result of your Query Bus to specific format.
In order to do this, we need to make use of Metadataand replyContentType header.
C
Aggregate Introduction
DDD Aggregates PHP
Works with: Laravel, Symfony, and Standalone PHP
The Problem
Business rules are enforced in multiple places — a validation here, a check there. When rules change, you update three files and miss a fourth. There's no single source of truth for what an Order or User can do, and no guarantee that business invariants are always protected.
How Ecotone Solves It
Ecotone's Aggregates encapsulate business rules in a single place. Commands are routed directly to the aggregate, which protects its own invariants. Ecotone handles loading and saving — you write business logic, not infrastructure code.
This chapter will cover the basics on how to implement an .
We will be using Command Handlers in this section, so ensure reading section first, to understand how Command are sent and handled.
Aggregate Command Handlers
Working with Aggregate Command Handlers is the same as with .
We mark given method with Command Handler attribute and Ecotone will register it as Command Handler.
In most common scenarios, Command Handlers are used as boilerplate code, which fetch the aggregate, execute it and then save it.
This is non-business code that is often duplicated wit each of the Command Handler we introduce.
Ecotone wants to shift the developer focus on the business part of the system, this is why this is abstracted away in form of Aggregate.
By providing Identifier attribute on top of property in your Aggregate, we state that this is identifier of this Aggregate (Entity in Symfony/Doctrine world, Model in Laravel world).
This is then used by Ecotone to fetch your aggregate automatically.
However Aggregates need to be in order to be executed.
When we will send an Command, Ecotone will use property with same name from the Command instance to fetch the Aggregate.
You may read more about Identifier Mapping and more advanced scenarios in .
When identifier is resolved, Ecotone use repository to fetch the aggregate and then call the method and then save it. So basically do all the boilerplate for you.
To implement repository reference to .
You may use inbuilt repositories, so you don't need to implement your own.
Ecotone provides , , integration with or .
State-Stored Aggregate
An Aggregate is a regular object, which contains state and methods to alter that state. It can be described as Entity, which carry set of behaviours.
When creating the Aggregate object, you are creating the Aggregate Root.
Aggregate tells Ecotone, that this class should be registered as Aggregate Root.
Identifier is the external reference point to Aggregate.
This field tells Ecotone to which Aggregate a given Command is targeted.
Converting Results
Converting query results in Database Business Interface
Converting Results
In rich business domains we will want to work with higher level objects than associate arrays.
Suppose we have PersonNameDTO and defined Ecotone's Converter for it.
class PersonNameDTOConverter
{
#[Converter]
public function from(PersonNameDTO $personNameDTO): array
{
return [
"person_id" => $personNameDTO->getPersonId(),
"name" => $personNameDTO->getName()
];
}
#[Converter]
public function to(array $personNameDTO): PersonNameDTO
{
return new PersonNameDTO($personNameDTO['person_id'], $personNameDTO['name']);
}
}
Converting to Collection of Objects
Ecotone will read the Docblock and based on that will deserialize Result Set from database to list of PersonNameDTO.
Converting to single Object
Using combination of First Row Fetch Mode, we can get first row and then use it for conversion to PersonNameDTO.
Converting Iterator
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one.
If we want to convert each result to given Class, we may define docblock describing the result:
Each returned row will be automatically convertered to PersonNameDTO.
Converting to specific Media Type Format
We may return the result in specific format directly. This is useful when Business Method is used on the edges of our application and we want to return the result directly.
In this example, result will be returned in application/json.\
CommandHandler defined on static method acts as factory method. Given command it should return new instance of specific aggregate, in that case new Product.
CommandHandler defined on non static class method is place where you would make changes to existing aggregate, fetched from repository.
class readonly GetTicketById
{
public function __construct(
public string $ticketId
) {}
}
class TicketController
{
// Query Bus will be auto registered in Depedency Container.
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->send(
new GetTicketById(
$request->get("ticketId")
)
);
return new Response(\json_encode($result));
}
}
$ticket = $messagingSystem->getQueryBus()->send(
new GetTicketById(
$ticketId
)
);
class TicketService
{
#[QueryHandler("ticket.getById")]
public function getTicket(string $ticketId) : array
{
//return ticket
}
}
class TicketController
{
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->sendWithRouting(
"ticket.getById",
$request->get("ticketId")
);
return new Response(\json_encode($result));
}
}
class TicketController
{
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->sendWithRouting(
"ticket.getById",
$request->get("ticketId"),
// Tell Ecotone which format you want in return
expectedReturnedMediaType: "application/json"
);
return new Response($result);
}
}
#[Aggregate]
class Product
{
#[Identifier]
private string $productId;
class ChangePriceCommand
{
private string $productId; // same property name as Aggregate's Identifier
private Money $priceAmount;
#[Aggregate] // 1
class Product
{
#[Identifier] // 2
private string $productId;
private string $name;
private integer $priceAmount;
private function __construct(string $orderId, string $name, int $priceAmount)
{
$this->productId = $orderId;
$this->name = $name;
$this->priceAmount = $priceAmount;
}
#[CommandHandler] //3
public static function register(RegisterProductCommand $command) : self
{
return new self(
$command->getProductId(),
$command->getName(),
$command->getPriceAmount()
);
}
#[CommandHandler] // 4
public function changePrice(ChangePriceCommand $command) : void
{
$this->priceAmount = $command->getPriceAmount();
}
}
/**
* @return PersonNameDTO[]
*/
#[DbalQuery('SELECT person_id, name FROM persons LIMIT :limit OFFSET :offset')]
public function getNameListDTO(int $limit, int $offset): array;
#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTO(int $personId): PersonNameDTO;
/**
* @return iterable<PersonNameDTO>
*/
#[DbalQuery(
'SELECT person_id, name FROM persons ORDER BY person_id ASC',
fetchMode: FetchMode::ITERATE
)]
public function getPersonIdsIterator(): iterable;
#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW,
replyContentType: 'application/json'
)]
public function getNameDTOInJson(int $personId): string;
Common challenges Ecotone solves for Laravel and Symfony developers
Every feature in Ecotone exists to solve a real problem that PHP developers face as their applications grow. Find the situation that matches yours:
If you recognize this...
See
Business logic is scattered across controllers, services, and listeners — nobody can explain end-to-end what happens when an order is placed
Queue jobs fail silently, a retry re-fires handlers that already succeeded, or a duplicate webhook double-charges the customer
Works with: Laravel, Symfony, and Standalone PHP
Scattered Application Logic
How to organize business logic with CQRS in Laravel and Symfony using Ecotone
The Problem You Recognize
Your application started clean, but as features grew, the boundaries blurred. Controllers handle business logic. Services read and write in the same method. Event listeners trigger side effects that nobody can trace.
In Laravel, you might have a 300-line Controller that validates input, queries the database, applies business rules, dispatches jobs, and returns a response — all in one method.
In Symfony, you might have a service class with 10 injected dependencies, where changing how orders are placed breaks the order listing page because both share the same service.
CQRS PHP
Command Query Responsibility Segregation PHP
Separate the code that changes state from the code that reads it — clear command and query handlers with zero boilerplate.
Demo Laravel and Symfony Application - You can test Ecotone in real-life example, by using our demo application. The demo application shows how to use Ecotone with Laravel and Symfony frameworks.
Quickstart Examples - Provides great way to check specific Ecotone features. Whether you use Laravel or Symfony or Lite (no external framework), all examples will be able to work in your Application.
Ask question to AI - Ecotone provides AI support, to help you find the answers quicker. You may ask any Ecotone related questions, and it will provide more details on the topic and links where more information can be found.
Have a Workshop or Consultancy - To quickly get whole Team or Organisation up and running with Ecotone, we provide workshops. Workshops will not only teach you how to use Ecotone, but also the concepts and reasoning behind it.
Join Community Channel - Ecotone has a community channel, where you can ask questions, discuss with other users and get help. It is also a great place to share your experiences, and to meet other developers using Ecotone.
Subscribe to Mailing list - Join mailing list to stay up to date with Ecotone changes and latest articles and features.
Some demos and quick-start examples are done using specific framework integration. However Ecotone does not bind given set of features to specific solution. Whether you use Laravel, Symfony or Lite (no external framework), all features will work in the same way.
Therefore feel encouraged to test out examples, even if they are not in framework of your choice.
Extending messaging with Interceptors and Middlewares in Ecotone PHP
Event Sourcing Aggregates
Event Sourcing Aggregates in Ecotone PHP
Event Stream Persistence
PHP Event Sourcing Persistence Strategy
The symptoms are familiar:
New developers take weeks to understand what happens when a user places an order
Testing a single business rule requires setting up the entire framework
A change in one area causes failures in unrelated features
What the Industry Calls It
CQRS — Command Query Responsibility Segregation. Separate the code that changes state (commands) from the code that reads state (queries). Add event handlers for side effects. Each handler has one job.
How Ecotone Solves It
With Ecotone, you organize your code around Command Handlers, Query Handlers, and Event Handlers — each responsible for exactly one thing. Ecotone wires them together automatically through PHP attributes:
No base classes. No interfaces to implement. Your existing Laravel or Symfony services stay exactly where they are — you add attributes to give them clear responsibilities.
class OrderService
{
#[CommandHandler]
public function placeOrder(PlaceOrder $command): void
{
// Only handles placing the order — nothing else
}
#[QueryHandler("order.get")]
public function getOrder(GetOrder $query): OrderView
{
// Only handles reading — no side effects
}
}
class NotificationService
{
#[EventHandler]
public function whenOrderPlaced(OrderWasPlaced $event): void
{
// Reacts to the event — fully decoupled from order logic
}
}
Let's create PlaceOrderCommand that will place an order in our system.
And Command Handler that will handle this Command
Registering Query Handlers
Let's define GetOrderQuery that will find our placed order.
And Query Handlerthat will handle this query
Running The Example
class PlaceOrder
{
private string $orderId;
private string $productName;
public function __construct(string $orderId, string $productName)
{
$this->orderId = $orderId;
$this->productName = $productName;
}
public function getOrderId(): string
{
return $this->orderId;
}
public function getProductName(): string
{
return $this->productName;
}
}
use Ecotone\Modelling\Attribute\CommandHandler;
class OrderService
{
private array $orders;
#[CommandHandler]
public function placeOrder(PlaceOrder $command) : void
{
$this->orders[$command->getOrderId()] = $command->getProductName();
}
}
class GetOrder
{
private string $orderId;
public function __construct(string $orderId)
{
$this->orderId = $orderId;
}
public function getOrderId(): string
{
return $this->orderId;
}
}
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
class OrderService
{
private array $orders;
#[CommandHandler]
public function placeOrder(PlaceOrder $command) : void
{
$this->orders[$command->getOrderId()] = $command->getProductName();
}
#[QueryHandler]
public function getOrder(GetOrder $query) : string
{
if (!array_key_exists($query->getOrderId(), $this->orders)) {
throw new \InvalidArgumentException("Order was not found " . $query->getOrderId());
}
return $this->orders[$query->getOrderId()];
}
}
How to build reliable async processing in Laravel and Symfony with Ecotone
The Problem You Recognize
You added async processing to handle background work — sending emails, processing payments, syncing data. But now you have new problems:
Failed jobs disappear silently or retry forever with no visibility
You can't replay a failed message after fixing the bug — the data is gone
A duplicate webhook triggers the same handler twice, leading to double charges or duplicate emails
Going async required touching every handler — adding queue configuration, serialization, and retry logic to each one
Retrying a failed event triggers all handlers again — if one of three event handlers fails, the retry re-executes the two that already succeeded, causing side effects like duplicate emails or double charges
In Laravel, you've scattered dispatch() calls and ShouldQueue implementations across your codebase. In Symfony, you've configured Messenger transports and retry strategies in YAML, but each handler still needs custom error handling.
What the Industry Calls It
Resilient Messaging — a combination of patterns: failure isolation (per-handler message delivery), automatic retries, error channels, dead letter queues, the outbox pattern for guaranteed delivery, and idempotency for deduplication.
How Ecotone Solves It
With Ecotone, making a handler async is a single attribute. Resilience is built into the messaging layer — not bolted on per handler:
Failure isolation — when multiple handlers subscribe to the same event, Ecotone delivers a separate copy of the message to each handler. If one fails, only that handler is retried — the others are not affected:
Retries, error channels, and dead letter queues are configured once at the channel level — every handler on that channel gets production resilience automatically. No per-handler boilerplate.
Next Steps
— Make handlers async with a single attribute
— Each handler gets its own message copy for safe retries
— Configure automatic retry strategies
As You Scale: Ecotone Enterprise adds for synchronous commands, for centralized error routing, and that protects every handler behind a bus automatically.
Complex Business Processes
How to manage complex multi-step business workflows in PHP with Ecotone
The Problem You Recognize
Your order fulfillment process spans 6 steps across 4 services. The subscription lifecycle involves payment processing, provisioning, notifications, and grace periods. User onboarding triggers a welcome email, account setup, and a follow-up sequence.
The logic for these processes is spread across:
Event listeners that trigger other event listeners
Cron jobs that check status flags
Database columns like is_processed, retry_count, step_completed_at
Nobody can explain the full flow without reading all the code. Adding a step means editing multiple files. Reordering steps is risky. When something fails mid-process, recovery means manually updating database flags.
What the Industry Calls It
Two distinct patterns solve this, and they're often confused:
Workflows — stateless pipe-and-filter chaining. The message flows from one handler to the next via output channels. Each step is independent; nothing is remembered across steps.
Sagas — stateful long-running coordination. The saga remembers where it is across events that may arrive seconds, minutes, or days apart, and decides what to do next based on prior state.
Neither Symfony Messenger nor Laravel Queues has a first-class equivalent — both stop at "dispatch a job." Ecotone provides both patterns natively.
How Ecotone Solves It
Workflows — chained handlers. Connect handlers through input and output channels. Each handler does one thing and passes the message on. No coordinator, no state; just declarative flow:
Sagas — stateful coordination. Track state across events that arrive over time. The saga remembers where it is and reacts to each event based on what came before:
Next Steps
— Simple linear workflows
— Stateful workflows that remember
— Recovery and compensation
As You Scale: Ecotone Enterprise adds — declarative workflow automation where you define step sequences in one place, with each step independently testable and reusable. Dynamic step lists adapt to input data without touching step code.
Microservice Communication
How to build reliable microservice communication in PHP with Ecotone Distributed Bus
The Problem You Recognize
Your monolith is splitting into services. Or you already have multiple services and they need to talk to each other.
The current approach: HTTP calls between services. When Service B is down, Service A fails too. You've built custom retry logic, custom serialization, and custom routing for each service pair. There's no guaranteed delivery — if a request fails, the data is lost unless you built a custom retry mechanism.
The symptoms:
Cascading failures — one service going down takes others with it
Custom glue code per service pair — serialization, routing, error handling
No event sharing — services can't subscribe to each other's events without point-to-point integrations
Broker lock-in — switching from RabbitMQ to SQS means rewriting integration code
What the Industry Calls It
Distributed Messaging — services communicate through a message broker with guaranteed delivery, event sharing, and transport abstraction.
How Ecotone Solves It
Ecotone's Distributed Bus lets services send commands and publish events to each other through message brokers. Your application code stays the same — Ecotone handles routing, serialization, and delivery:
Supports RabbitMQ, Amazon SQS, Redis, and Kafka — swap transports without changing application code.
Next Steps
— Cross-service messaging
— Consuming from external sources
— Publishing to external targets
As You Scale: Ecotone Enterprise adds — a topology-aware distributed bus that supports multiple brokers in a single topology, automatic routing, and cross-framework integration.
Aggregates & Sagas
Quick start with Aggregates and Sagas in Ecotone PHP
Laravel demo applications with DDD, CQRS, and Event Sourcing
Demo Message Bus
Demo Publishing Events
Symfony Demos
Symfony demo applications with DDD, CQRS, and Event Sourcing
Message Bus Demo
Publishing Events Demo
Tutorial
Ecotone PHP Framework
Get started with Ecotone
The best way to get started with Ecotone is to actually build something realistic.
Therefore we will build a small back-end for Shopping System during this tutorial.
The techniques we will learn in the tutorial are fundamental to building any application using Ecotone.
Aggregate Query Handlers
DDD PHP
Read sections first to get more details about Aggregates.
Aggregate Query Action
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading in order to execute the query method.
And then we call it from Query Bus:
Inbuilt Repositories
Built-in Aggregate repositories for Doctrine ORM, Eloquent, and DBAL
Ecotone comes with inbuilt repositories, so we don't need to configure Repositories ourselves. It often happen that those are similar between projects, therefore it may be that there is no need to roll out your own.
Inbuilt Repositories
Ecotone provides inbuilt repositories to get you started quicker. This way you can enable given repository and start implementing higher level code without worrying about infrastructure part.
Additional Scenarios
Advanced Interceptor scenarios and configurations in Ecotone
Access attribute from interceptor
We may access attribute from the intercepted endpoint in order to perform specific action
then we would have an Message Endpoint using this Attribute:
and it can be used in the intereceptors by type hinting given parameter:
Intercepting Asynchronous Endpoints
Intercepting asynchronous message endpoints in Ecotone PHP
Read to find out more about Interceptors.
Intercepting Asynchronous Endpoints
We may aswell intercept Asynchronous Endpoints pretty easily. We do it by using pointing to AsynchronousRunningEndpoint class.
Event Sourcing
Event Sourcing PHP
Works with: Laravel, Symfony, and Standalone PHP
The Problem
You store the current state of your entities, but not how they got there. When a customer disputes a charge, you can't answer "what exactly happened?" Rebuilding read models after a schema change means writing migration scripts by hand. Auditors ask for a complete trail of changes and you piece it together from application logs.
Installation
Installing Event Sourcing support in Ecotone PHP
Ecotone comes with full automation for setting up Event Sourcing for us. This we can we really easily roll out new features with Event Sourcing with just minimal or none setup at all.
Install Event Sourcing Support
Before we will start, let's first install Event Sourcing module, which will provide us with all required components:
We need to configure in order to make use of it.
Ecotone PDO Event Sourcing does provide support for three databases:
Making Stream immune to changes
Making event streams immune to class and namespace changes
Changes in the Application will happen. After some time we may want to refactor namespaces, change the name of Aggregate or an Event. However those kind of changes may break our system, if we already have production data which references to any of those.
Therefore to make our Application to immune to future changes we need a way to decouple the code from the data in the storage, and this is what Ecotone provides.
Custom Stream Name
Our Event Stream name by default is based on the Aggregate Class name. Therefore to make it immune to changes we may provide custom Stream Name. To do it we will apply Stream attribute to the aggregate:
This provides integration with Eloquent ORM. Eloquent support is available out of the box after installing Laravel module.
Document Store Repository
This provides integration Document Store using relational databases. It will serialize your aggregate to json and deserialize on load using Converters.
To enable it read in Dbal Module Section.
Event Sourcing Repository
Ecotone provides inbuilt Event Sourcing Repository, which will set up Event Store and Event Streams. To enable it read Event Sourcing Section.
How Ecotone Solves It
Ecotone provides Event Sourcing as a first-class feature. Instead of storing current state, you store the sequence of events that led to it. Rebuild any view of the data by replaying events. Get a complete, immutable audit trail automatically. Works with Postgres, MySQL, and MariaDB for event storage, with projections that can write to any storage you choose.
You may of course use of Query class or metadata in case of need, which will be passed to your aggregate's method.
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $assignedTo;
#[QueryHandler("ticket.get_assigned_person")]
public function getAssignedTo(): string
{
return $this->assignedTo;
}
}
$this->commandBus->sendWithRouting(
"ticket.get_assigned_person",
// We provide instance of Ticket aggregate using metadata
metadata: ["aggregate.id" => $ticketId]
)
#[\Attribute]
class Cache
{
public string $cacheKey;
public int $timeToLive;
public function __construct(string $cacheKey, int $timeToLive)
{
$this->cacheKey = $cacheKey;
$this->timeToLive = $timeToLive;
}
}
class ProductsService
{
#[QueryHandler]
#[Cache("hotestProducts", 120)]
public function getHotestProducts(GetOrderDetailsQuery $query) : array
{
return ["orderId" => $query->getOrderId()]
}
}
Inject Message's payload
As part of around intercepting, if we need Message Payload to make the decision we can simply inject that into our interceptor:
Inject Message Headers
We can also inject Message Headers into our interceptor. We could for example inject Message Consumer name in order to decide whatever to start the transaction or not:
class TransactionInterceptor
{
#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(MethodInvocation $methodInvocation)
{
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
}catch (\Throwable $exception) {
$this->connection->rollBack();
throw $exception;
}
return $result;
}
}
Ecotone provides inbuilt functionality to serialize your Events, which can be customized in case of need. This makes Ecotone take care of Event Serialization/Deserialization, and allows us to focus on the business side of the code.
We can take over this process and set up our own Serialization, however Ecotone JMS Converter can fully do it for us, so we can simply focus on the business side of the code. To make it happen all we need to do, is to install JMS Package and we are ready to go:
By default events in the stream will hold Aggregate Class name as AggregateType.
You may customize this by applying AggregateType attribute to your Aggregate.
You may wonder what is the difference between Stream name and Aggregate Type.
By default the are the same, however we could use the same Stream name between different Aggregates, to store them all together within same Table.
This may useful during migration to next version of the Aggregate, where we would want to hold both versions in same Stream.
Storing Events By Names
To avoid storing class names of Events in the Event Store we may mark them with name:
This way Ecotone will do the mapping before storing an Event and when retrieving the Event in order to deserialize it to correct class.
Testing
It's worth to remember that if we want test storing Events using provided Event Named, we need to add them under recognized classes, so Ecotone knows that should scan those classes for Attributes:
// Make any handler async with one attribute
#[Asynchronous("notifications")]
#[EventHandler]
public function sendWelcomeEmail(UserRegistered $event): void
{
// If this fails, Ecotone retries automatically
// If it keeps failing, it goes to the dead letter queue
// You can replay it after fixing the bug
}
#[Asynchronous("notifications")]
#[EventHandler]
public function sendWelcomeEmail(UserRegistered $event): void
{
// If this fails, only this handler retries
// The inventory handler below is NOT re-triggered
}
#[Asynchronous("inventory")]
#[EventHandler]
public function reserveInventory(UserRegistered $event): void
{
// Runs independently — isolated from email handler failures
}
#[CommandHandler(
routingKey: "order.place",
outputChannelName: "order.verify_payment"
)]
public function placeOrder(PlaceOrder $command): OrderData
{
// Step 1: Create the order, pass to next step
}
#[Asynchronous('async')]
#[InternalHandler(
inputChannelName: "order.verify_payment",
outputChannelName: "order.ship"
)]
public function verifyPayment(OrderData $order): OrderData
{
// Step 2: Verify payment, pass to shipping
}
#[Saga]
class OrderFulfillment
{
#[Identifier]
private string $orderId;
private string $status;
#[EventHandler]
public static function start(OrderWasPlaced $event): self
{
// Begin the saga — tracks state across events
}
#[EventHandler]
public function onPaymentReceived(PaymentReceived $event, CommandBus $bus): void
{
$this->status = 'paid';
$bus->send(new ShipOrder($this->orderId));
}
}
// Service A: Send a command to Service B
$distributedBus->sendCommand(
targetServiceName: "order-service",
command: new PlaceOrder($orderId, $items),
);
// Service B: Handle commands from other services — same as local handlers
#[Distributed]
#[CommandHandler]
public function placeOrder(PlaceOrder $command): void
{
// This handler receives commands from any service
}
class NotificationFilter
{
#[After]
public function filter($result, Cache $cache) : ?array
{
$this->cachingSystem($cache->cacheKey, $result, $cache->timeToLive);
}
}
#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(
MethodInvocation $methodInvocation,
#[Payload] string $command
)
#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(
MethodInvocation $methodInvocation,
#[Header('polledChannelName')] string $consumerName
)
composer require ecotone/jms-converter
#[Stream("basket_stream")]
class Basket
#[Projection(self::PROJECTION_NAME, "basket_stream")]
class BasketList
#[AggregateType("basket")]
class Basket
#[NamedEvent("basket.was_created")]
class BasketWasCreated
{
public const EVENT_NAME = "basket.was_created";
private string $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}
Lesson 1, we will learn the fundamentals of Ecotone: Endpoints, Messages, Channels, and Command Query Responsibility Segregation (CQRS)
Lesson 2, we will learn Tactical Domain Driven Design (DDD): Aggregates, Repositories and also Event Handlers
Lesson 3, we will learn how to use Converters, therefore how to handle serialization and deserialization
, we will learn about Metadata and Method Invocation - How we can execute Message Handlers in a way not available in any other PHP Framework
, we will learn about Interceptors, Ecotone's powerful Middlewares
, we we will learn about Asynchronous Endpoints, so how to process our Messages asynchronously.
You don’t have to complete all of the lessons at once to get the value out of this tutorial.
You will start benefit from the tutorial even if it’s one or two lessons.
Why Ecotone - The enterprise architecture layer for PHP
What Ecotone Is (and Isn't)
Ecotone is not a framework replacement. You don't rewrite your Laravel or Symfony application to use Ecotone — you add it.
Think of it this way: API Platform provides the API layer on top of Symfony. Ecotone provides the enterprise messaging layer on top of your framework.
You keep your ORM (Eloquent or Doctrine), your routing, your templates, your deployment. Ecotone handles the messaging architecture — the part that makes your application resilient, scalable, and maintainable as complexity grows.
# That's it. Your framework stays, Ecotone adds the enterprise layer.
composer require ecotone/laravel
# or
composer require ecotone/symfony-bundle
Every Ecosystem Has This Layer — Except PHP
Enterprise software in other ecosystems has mature tooling for messaging, CQRS, Event Sourcing, and distributed systems:
Ecosystem
Enterprise Messaging Layer
This isn't about PHP being inferior. It's about PHP maturing into enterprise domains. Teams building complex business systems in PHP deserve the same caliber of tooling that Java and .NET teams have had for years.
Ecotone is built on the same foundation — — that powers Spring Integration, NServiceBus, and Apache Camel.
What You Get
Instead of learning pattern names first, start with the problem you're solving:
Your problem
What the industry calls it
Ecotone feature
How It Integrates
Ecotone plugs into your existing framework without requiring changes to your application structure:
Laravel
Laravel's queue runs jobs, not business processes — anything resembling aggregates, sagas, workflows, or event sourcing ends up stitched together from separate libraries. Ecotone fills that layer directly: works with Eloquent for aggregate persistence, Laravel Queues for async message channels, and Laravel Octane for high-performance scenarios. Configuration via your standard Laravel config files.
Symfony
Symfony Messenger handles dispatch — aggregates, sagas, event sourcing, and transactional outbox are left to you. Ecotone fills that layer directly: works with Doctrine ORM for aggregate persistence, Symfony Messenger Transport for async message channels, and standard Bundle configuration. Ecotone auto-discovers your attributes in the src directory.
Standalone
For applications without Laravel or Symfony, Ecotone Lite provides the full feature set with minimal dependencies.
Start Free, Scale with Enterprise
Ecotone Free gives you everything you need for production-ready CQRS, Event Sourcing, and Workflows — message buses, aggregates, sagas, async messaging, interceptors, retries, error handling, and full testing support.
Ecotone Enterprise is for when your system outgrows single-tenant, single-service, or needs advanced resilience and scalability — orchestrators, distributed bus with service map, dynamic channels, partitioned projections, Kafka integration, and more.
PHP for Enterprise Architecture
Enterprise architecture patterns in PHP - comparing Ecotone to Spring, Axon, NServiceBus
The Problem You Recognize
You're a technical lead or architect evaluating whether PHP can handle enterprise-grade architecture. Your team knows PHP well, but the business is growing — you need CQRS, Event Sourcing, distributed messaging, multi-tenancy, and production resilience.
The alternative is migrating to Java (Spring + Axon) or .NET (NServiceBus, MassTransit). That means retraining your team, rewriting your application, and losing PHP's development speed.
PHP Has Grown Up
PHP is no longer just for simple web applications. Modern PHP (8.1+) has union types, enums, fibers, readonly properties, and first-class attributes. Frameworks like Laravel and Symfony provide the web layer. What was missing was the enterprise messaging layer — the equivalent of what Spring Integration and NServiceBus provide in their ecosystems.
Ecotone fills that gap. Built on the same that underpin Spring Integration, NServiceBus, and Apache Camel, Ecotone brings production-grade enterprise patterns to PHP.
How Ecotone Compares
Capability
Java (Axon)
.NET (NServiceBus)
PHP (Ecotone)
What You Get With Ecotone
Enterprise Integration Patterns as the foundation — not a custom abstraction
Framework integration — works on top of Laravel and Symfony, not replacing them
Attribute-driven configuration — PHP 8 attributes instead of XML or YAML
Repositories are used for retrieving and saving the aggregate to persistent storage.
Typical flow for calling aggregate method would looks like below:
class AssignWorkerHandler
{
private TicketRepository $ticketRepository;
#[CommandHandler]
public function handle(AssignWorkerCommand $command) : void
{
// fetch the aggregate from repository
$ticket = $this->ticketRepository->findBy($command->getTicketId());
// call action method
$ticket->assignWorker($command);
// store the aggregate in repository
$this->ticketRepository->save($ticket);
}
}
$this->commandBus->send(
new AssignWorkerCommand(
$ticketId, $workerId,
)
);
By setting up Repository we provide Ecotone with functionality to fetch and store the Aggregate , so we don't need to write the above orchestration code anymore.
Ecotone's Aggregate Flow
If our class is defined as Aggregate, Ecotone will use Repository in order fetch and store it, whenever the Command is sent via Command Bus.
Now when we will send the Command, Ecotone will use ticketId from the Command to fetch related Ticket Aggregate, and will called assignWorker passing the Command. After this is completed it will use the repository to store changed Aggregate instance.
Therefore from high level nothing changes:
This way we don't need to write orchestration level code ourselves.
Extending Message Buses (Gateways)
Extending Command, Event, and Query Buses with custom Gateways
Ecotone provides ability to extend any Messaging Gateway using Interceptors. We can hook into the flow and add additional behavior.
For better understanding, please read Interceptors section before going through this chapter.
Intercepting Gateways
Suppose we want to add custom logging, whenever any Command is executed.
We know that CommandBus is a interface for sending Commands, therefore we need to hook into that Gateway.
class LoggerInterceptor
{
#[Before(pointcut: CommandBus::class)]
public function log(object $command, array $metadata) : void
{
// log Command message
}
}
Intercepting Gateways, does not differ from intercepting Message Handlers.
Building customized Gateways
We may also want to have different types of Message Buses for given Message Type.
For example we could have EventBus with audit which we would use in specific cases. Therefore we want to keep the original EventBus untouched, as for other scenarios we would simply keep using it.
To do this, we will introduce our new EventBus:
That's basically enough to register our new interface. This new Gateway will be automatically registered in our DI container, so we will be able to inject it and use.
It's enough to extend given Gateway with custom interface to register new abstraction in Gateway in Dependency Container.
In above example AuditableEventBus will be automatically available in Dependency Container to use, as Ecotone will deliver implementation.
Now as this is separate interface, we can point interceptor specifically on this
Pointcut by attributes
We could of course intercept by attributes, if we would like to make audit functionality reusable
and then we pointcut based on the attribute
Asynchronous Gateways
Gateways can also be extended with asynchronous functionality on which you can read more in .
Event Sourcing Introduction
Using Event Sourcing in PHP
Works with: Laravel, Symfony, and Standalone PHP
The Problem
You store the current state but not how you got there. When a customer disputes a charge, you can't answer "what exactly happened?" Rebuilding read models after a schema change means writing migration scripts by hand.
How Ecotone Solves It
Ecotone's Event Sourced Aggregates store events instead of current state. Every state change is an immutable event in a stream. Projections rebuild read models from event history — change the schema, replay the events, get a correct read model.
Before diving into this section be sure to understand how Aggregates works in Ecotone based on .
Difference between Aggregate Types
Ecotone provides higher level abstraction to work with Event Sourcing, which is based on Event Sourced Aggregates.
Event Sourced Aggregate just like normal Aggregates protect our business rules, the difference is in how they are stored.
State-Stored Aggregates
Normal Aggregates are stored based on their current state:
Yet if we change the state, then our previous history is lost:
Having only the current state may be fine in a lot of cases and in those situation it's perfectly fine to make use of . This is most easy way of dealing with changes, we change and we forget the history, as we are interested only in current state.
When we actually need to know what was the history of changes, then State-Stored Aggregates are not right path for this. If we will try to adjust them so they are aware of history we will most likely complicate our business code. This is not necessary as there is better solution - Event Sourced Aggregates.
Event Sourcing Aggregate
When we are interested in history of changes, then Event Sourced Aggregate will help us.
Event Sourced Aggregates are stored in forms of Events. This way we preserve all the history of given Aggregate:
When we change the state the previous Event is preserved, yet we add another one to the audit trail (Event Stream).
This way all changes are preserved and we are able to know what was the historic changes of the Product.
Event Stream
The audit trail of all the Events that happened for given Aggregate is called Event Stream.
Event Stream contains of all historic Events for all instance of specific Aggregate type, for example all Events for Product Aggregate
Let's now dive a bit more into Event Streams, and what they actually are.
Applying Events
Applying events to rebuild Event Sourcing Aggregate state in PHP
As mentioned earlier, Events are stored in form of a Event Stream.
Event Stream is audit of Events, which happened in the past.
However to protect our business invariants, we may want to work with current state of the Aggregate to know, if given action is possible or not (business invariants).
Business Invariants
Business Invariants in short are our simple "if" statements inside the Command Handler in the Aggregate. Those protect our Aggregate from moving into incorrect state.
With State-Stored Aggregates, we always have current state of the Aggregate, therefore we can check the invariants right away.
With Event-Sourcing Aggregates, we store them in form of an Events, therefore we need to rebuild our Aggregate, in order to protect the invariants.
Suppose we have Ticket Event Sourcing Aggregate.
For this Ticket we do allow for assigning an Person to handle the Ticket.
Let's suppose however, that Business asked us to allow only one Person to be assigned to the Ticket at time. With current code we could assign unlimited people to the Ticket, therefore we need to protect this invariant.
To check if whatever Ticket was already assigned to a Person, our Aggregate need to have state applied which will tell him whatever the Ticket was already assigned.
To do this we use EventSourcingHandler attribute passing as first argument given Event class. This method will be called on reconstruction of this Aggregate. So when this Aggregate will be loaded, if given Event was recorded in the Event Stream, method will be called:
Then this state, can be used in the Command Handler to decide whatever we can trigger an action or not:
As you can see, it make sense to only assign to the state attributes that protect our invariants. This way the Aggregate stays readable and clean of unused information.
Event versioning
Event versioning and upcasting for Event Sourcing in PHP
In its lifetime events may change. In order to track those changes Ecotone provides possibility of versioning events.
use Ecotone\Modelling\Attribute\Revision;
#[Revision(2)]
class MyEvent
{
public string $id;
}
Value given with Revision attribute will be stored by Ecotone in events metadata. Attribute is used only when event is saved in event store. In order to read it, you can access events metadata, e.g. in event handler.
use Ecotone\Messaging\MessageHeaders;
class MyEventHandler
{
#[EventHandler]
public function handle(MyEvent $event, array $metadata) : void
{
if ($metadata[MessageHeaders::REVISION] !== 2) {
return; // this is not the revision I'm looking for
}
// the force is strong with this one
}
}
Revision applies to messages in general (also commands and queries). However, for now it is used only when events gets saved.
You don't have to define Revision for your current events. Ecotone will set it's value to 1 by default. Also, if not defined in the class, already saved events will be read with Revision1.
[Enterprise] Accessing Metadata in Event Sourcing Handler
This feature is available as part of Ecotone Enterprise.
Depending on the version we may actually want to restore our Aggregate a bit differently.
This is especially useful when we've changed the way Events are structured and introduced new version of the Event.
For this we can use revision header to access the version on which given Event was stored.
We may inject any type of Header that was stored together with the Event.
This means inbuilt not only headers like timestamp, id, correlationId are avaiable out of the box, but also custom headers provided by the application (e.g. userId).
Snapshoting
PHP Event Sourcing Snapshoting
In general having streams in need for snapshots may indicate that our model needs revisiting.
We may cut the stream on some specific event and begin new one, like at the end of month from all the transactions we generate invoice and we start new stream for next month.
However if cutting the stream off is not an option for any reason, we can use snapshots to avoid loading all events history for given Aggregate. Every given set of events snapshot will be taken, stored and retrieved on next calls, to fetch only events that happened after that.
Setting up
EventSourcingConfiguration provides the following interface to set up snapshots.
class EventSourcingConfiguration
{
public const DEFAULT_SNAPSHOT_TRIGGER_THRESHOLD = 100;
public function withSnapshotsFor(
string $aggregateClassToSnapshot, // 1.
int $thresholdTrigger = self::DEFAULT_SNAPSHOT_TRIGGER_THRESHOLD, // 2.
string $documentStore = DocumentStore::class // 3.
): static
}
$aggregateClassToSnapshot - class name of an aggregate you want Ecotone to save snapshots of
$thresholdTrigger - amount of events for interval of taking a snapshot
$documentStore - reference to document store which will be used for saving/retrieving snapshots
To set up snapshots we will define configuration.
Ecotone make use of to store snapshots, by default it's enabled with event-sourcing package.
If you want to clean the snapshots, you can do it manually. Snapshots are stored in aggregate_snapshots collection.
Snapshot threshold
Threshold states at which interval snapshots should be done. Therefore with below configuration:
snapshots will be done every 500 events. Then when snapshot will be loaded, it will start loading the events from event number 501 for given Aggregate instance.
Gap Detection and Consistency
PHP Event Sourcing Projection Gap Detection
The Problem
Two users place orders at the exact same time. Both transactions write to the event store, but one commits a split-second before the other. Your projection processes event #11 but event #10 isn't visible yet — and silently gets skipped forever. How do you guarantee no events are lost?
Where the Problem Comes From
Gap detection matters specifically for globally tracked projections. A global stream combines events from many different aggregates into a single ordered sequence. When multiple transactions write events for different aggregates in parallel, they each get a position number — but they may commit in any order.
Consider two concurrent transactions:
TX1 writes event at position 10 (for Ticket-A), starts first but commits slowly
TX2 writes event at position 11 (for Ticket-B), starts second but commits first
When the projection queries the stream after TX2 commits, it sees position 11 — but position 10 is not yet visible (TX1 hasn't committed). If the projection simply advances its position to 11, event 10 is lost forever.
The Common (Flawed) Approach: Time-Based Waiting
Many event sourcing systems solve this by making the projection wait — "if I see position 11 but not 10, pause and wait for 10 to appear."
The problem with waiting:
If TX1 takes 5 seconds to commit, the entire projection halts for 5 seconds
All events after position 10 are blocked — even though they're from completely unrelated aggregates
In high-throughput systems, this waiting cascades and can bring down the whole projection pipeline
Time-based gap detection trades throughput for safety and yet is not solving this problem at the root cause.
Ecotone's Approach: Track-Based Gap Detection
Instead of waiting, Ecotone records the gap and moves on. The position is stored as a compact format that tracks both where the projection is and which positions are missing:
On the next run:
If event 10 has appeared (TX1 committed), it gets processed and removed from the gap list
If event 10 is still missing, it stays in the gap list — the projection continues processing new events
This approach never blocks. The projection keeps making progress on events that are available, while tracking gaps for eventual catch-up.
Track-based gap detection is the safest and fastest approach: it never blocks processing, never loses events, and naturally catches up as late-arriving transactions commit. This is why Ecotone chose this strategy over time-based waiting.
Gap Cleanup
Not all gaps will be filled — an event could be genuinely missing (deleted, or from a rolled-back transaction that was never committed). Ecotone cleans up stale gaps using two strategies:
Offset-based: gaps more than N positions behind the current position are removed. They are too old to represent an in-flight transaction.
Timeout-based: gaps older than a configured time threshold (based on event timestamps) are removed.
Both strategies ensure the gap list stays bounded and doesn't grow indefinitely.
Why Partitioned Projections Don't Need Gap Detection
Partitioned projections track position per aggregate, not globally. Events within a single aggregate are guaranteed to be stored in order — each event's version is strictly previous + 1.
If two transactions try to write to the same aggregate concurrently, the Event Store raises an optimistic lock exception — one transaction will fail and retry. This is guaranteed at the Event Store level.
Because events within a partition can never be committed out of order, gaps within a partition cannot happen. Gap detection is only needed when tracking across multiple partitions in a global stream — exactly what globally tracked projections do.
This is another advantage of : they sidestep the gap detection problem entirely, because each partition's event ordering is guaranteed by the Event Store's concurrency control.
Final Failure Strategy
Final failure strategy when all message retries are exhausted
Defines how to handle failures when processing messages. This is final failure strategy as it's used in case, when there is no other way to handle the failure. For example, when there is no retry policy, or when the retry policy has reached its maximum number of attempts. Also, when the destination of Error Channel is not defined, or sending to Error Channel fails.
Available Strategies
Ecotone provides three final strategies:
RELEASE - Message is released back to the Channel for another attempt. This way order will be preserved, yet it can result in processing being blocked if the message keeps failing.
RESEND - Message is resend back to the Channel for another attempt, as a result Message Consumer will be unblock and will be able to continue on next Messages. This way next messages can be consumed without system being stuck.
IGNORE - Message is discarded, processing continues. Can be used for non critical message, to simply ignore failed messages.
STOP - Consumer stops, message is preserved. This strategy can be applied when our system depends heavily on the order of the Messages to work correctly. In that case we can stop the Consumer, resulting in Message still awaiting to be consumed.
Configuration Message Channel
This can be configured on Message Channel level:
Default for Message Channels is resend strategy.
Configuration Consumer
This can also be configured at the Message Consumer level
Default for Message Consumers is stop strategy.
Installation
Installing Ecotone for Symfony, Laravel or Stand Alone
Prerequisites
Before installing Ecotone, ensure you have:
PHP 8.1 or higher
Configure Repository
Configuring custom Aggregate repositories in Ecotone PHP
To use Ecotone's Aggregate functionality, we need a registered repository. Ecotone comes with built-in support for popular persistence options like Doctrine ORM, Eloquent, and document stores, so there's a good chance we can use what we already have without extra work. If our storage solution isn't supported yet, or if we have specific requirements, we can easily register our own custom repository by following the steps in this section. This flexibility means we're not locked into any particular database or ORM—we can use whatever fits our project best.
Repository for State-Stored Aggregate
State-Stored Aggregate are normal Aggregates, which are stored using Standard Repositories.
Therefore to configure Repository for your Aggregate, create a class that extends
Different ways to Record Events
Different ways to record events in Event Sourcing Aggregates
Two ways of setting up Event Sourced Aggregates
There are two ways we can configure our Aggregate to record Events.
Recovering, Tracing and Monitoring
Recovering, tracing, and monitoring message-driven applications
To keep the system reliable and resilient it's important to handle errors with grace.
Ecotone provides solutions to handle failures within the system that helps in:
Self-healing the application (, , )
Ensuring Data Consistency (, , )
Audit Trail & State Rebuild
How to implement Event Sourcing for audit trails and state rebuilds in PHP
The Problem You Recognize
A customer disputes a charge. Your support team asks "what exactly happened to this order?" The answer requires reading application logs, database timestamps, and hoping someone didn't overwrite the data.
Your read models need a schema change. You write a migration script, but there's no way to verify the migrated data is correct — the original events that created it are gone. You store the current state, but not how you got there.
The symptoms:
Ecotone Pulse (Service Dashboard)
Ecotone Pulse service dashboard for monitoring message consumers
Whenever message fails during it will kept repeated till the moment it will succeed. However retry strategy with dead letter queue may be set up in order to retry message given amount of times and then move it to the storage for later review and manual retry.
This is where Ecotone Pulse kicks in, as instead of reviewing and replaying the message directly from the application's console, you may do it directly from the UI application. Besides you may connect multiple Ecotone's application to the Pulse Dashboard to have full overview of your whole system.
Ecotone Pulse provide way to control error messages for all your services from one place.
Concurrency Handling
Handling concurrency with optimistic and pessimistic locking
Concurrency exceptions when multiple processes or threads access and modify shared resources simultaneously. These exceptions happen because two or more operations conflict try to change same piece of data. Ecotone provides built-in support for concurrency handling.
In order to solve concurrent access, Ecotone implements Optimistic Locking.
Optimistic Locking
Each Event Sourcing Aggregate or Event Sourcing Saga has a version property that represents the current version of the resource. When modifications are made, the version is incremented. If two concurrent processes attempt to modify the same resource with different versions, a concurrency exception is raised. This is default behaviour, if we are using inbuilt Event Sourcing support.
gitclone[email protected]:ecotoneframework/symfony-tutorial.git# Go to symfony-tutorial catalog
gitclone[email protected]:ecotoneframework/laravel-tutorial.git# Go to laravel-tutorial catalog# Normally you will use "php artisan" for running console commands# To reduce number of difference during the tutorial# "artisan" is changed to "bin/console"
gitclone[email protected]:ecotoneframework/lite-tutorial.git# Go to lite-tutorial catalog
/** Ecotone Quickstart ships with docker-compose with preinstalled PHP 8.0 */1.Run"docker-compose up -d"2.Entercontainer"docker exec -it ecotone-quickstart /bin/bash"3.Runstartingcommand"composer install"4.Runstartingcommand"bin/console ecotone:quickstart"5.Youshouldsee:"Running example...Hello WorldGood job, scenario ran with success!"
/** You need to have atleast PHP 8.0 and Composer installed */1.Run"composer install"2.Runstartingcommand"bin/console ecotone:quickstart"3.Youshouldsee:"Running example...Hello WorldGood job, scenario ran with success!"
Yes
Sagas
Yes
Yes
Yes
Workflow Orchestration
Manual
Yes
Yes
Resiliency (Retries, Dead Letter, Outbox)
Yes
Yes
Yes
Distributed Messaging
Yes
Yes
Yes
Multi-Tenancy
Manual
Manual
Built-in
Message Broker Support
Kafka, RabbitMQ, etc.
RabbitMQ, Azure, etc.
RabbitMQ, Kafka, SQS, Redis
Observability
Micrometer
OpenTelemetry
OpenTelemetry
Testing Support
Axon Test Fixtures
NServiceBus Testing
Built-in Test Support
Production resilience — retries, error channels, dead letter, outbox, deduplication
Full testing support — test message flows, aggregates, sagas, and event sourcing in isolation
Observability — OpenTelemetry integration for tracing and metrics
Multi-tenancy — built-in support for tenant-isolated processing
#[Aggregate]
class Ticket
{
#[Identifier]
private string $ticketId;
#[CommandHandler]
public function assignWorker(AssignWorkerCommand $command)
{
// do something with assignation
}
}
If you're using Symfony Flex (recommended), the bundle will auto-configure.
If auto-configuration didn't work, manually register the bundle in config/bundles.php:
Step 3: Verify Installation
Run this command to check if Ecotone is properly installed:
By default Ecotone will look for Attributes in default Symfony catalog "src".
If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
The service provider should be automatically registered via Laravel's package discovery.
If auto-registration didn't work, manually add the provider to config/app.php:
Step 3: Verify Installation
Run this command to check if Ecotone is properly installed:
By default Ecotone will look for Attributes in default Laravel catalog "app".
If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
Install Ecotone Lite (No framework)
If you're using no framework or framework different than Symfony or Laravel, then you may use Ecotone Lite to bootstrap Ecotone.
composer require ecotone/ecotone
In order to start, you need to have a composer.json with PSR-4 or PSR-0 autoload setup.
With Custom Dependency Container
If you already have Dependency Container configured, then:
Load namespaces
By default Ecotone will look for Attributes only in Classes provided under "classesToResolve".
If we want to look for Attributes in given set of Namespaces, we can pass it to the configuration.
With no Dependency Container
You may actually run Ecotone without any Dependency Container. That may be useful for small applications, testing or when we want to run some small Ecotone's script.
Ecotone Lite Application
You may use out of the box Ecotone Lite Application, which provide you with Dependency Container.
composer require ecotone/lite-application
With default configuration, Ecotone will look for classes inside "src" catalog.
Common Installation Issues
"Class not found" errors
Problem: Ecotone can't find your classes with attributes. Solution: Make sure your classes are in the correct namespace and directory structure matches your PSR-4 autoloading configuration.
Bundle/Provider not registered
Problem: Ecotone commands are not available. Solution:
For Symfony: Check that the bundle is listed in config/bundles.php
For Laravel: Check that the provider is in config/app.php or that package discovery is enabled
Permission errors
Problem: Cache directory is not writable. Solution: Ensure your web server has write permissions to the cache directory (usually var/cache for Symfony or storage/framework/cache for Laravel).
StandardRepository
interface:
canHandle method informs, which Aggregate Classes can be handled with this Repository. Return true, if saving specific aggregate is possible, false otherwise.
findBy method returns if found, existing Aggregate instance, otherwise null.
save method is responsible for storing given Aggregate instance.
$identifiers are array of #[Identifier] defined within aggregate.
$aggregate is instance of aggregate
$metadata is array of extra information, that can be passed with Command
$expectedVersion if version locking by #[Version] is enabled it will carry currently expected
Set up your own Implementation
When your implementation is ready simply mark it with #[Repository] attribute:
Example implementation using Doctrine ORM
This is example implementation of Standard Repository using Doctrine ORM.
Repository:
Using Multiple Repositories
By default Ecotone when we have only one Standard and Event Sourcing Repository registered, Ecotone will use them for our Aggregate by default.
This comes from simplification, as if there is only one Repository of given type, then there is nothing else to be choose from.
However, if we register multiple Repositories, then we need to take over the process and tell which Repository will be used for which Aggregate.
In case of Custom Repositories, we may use Ecotone support for optimistic locking to raise the exception in the Repository.
Version will be passed to the repository, based on #[AggregateVersion] property inside the Aggregate/Saga.
We don't need to deal with increasing those on each action. Ecotone will increase it in our Saga/Aggregate automatically.
We may also use inbuilt trait to avoid adding property manually.
Self Healing
To handle concurrency exceptions and ensure the system can self-heal, Ecotone offers retry mechanisms.
In synchronous scenarios, like Command Handler being called via HTTP, instant retries can be used to recover. If a concurrency exception occurs, the Command Message will be retried immediately, minimizing any impact on the end user. This immediate retry ensures that the Message Handler can self-heal and continue processing without affecting the user experience.
&#xNAN;In asynchronous scenarios, you can use still use instant retries, yet you may also provide delayed retries. This means that when concurrency exception will occur, the Message will be retried after a certain delay. This as a result free the system resources from continues retries and allows for recovering after given period of delay.
$this->commandBus->send(
new AssignWorkerCommand(
$ticketId, $workerId,
)
);
interface AuditableEventBus extends EventBus {}
#[CommandHandler]
public function placeOrder(PlaceOrder $command, AuditableEventBus $eventBus)
{
// place order
$eventBus->publish(new OrderWasPlaced());
}
class Audit
{
#[Before(pointcut: AuditableEventBus::class)]
public function log(object $event, array $metadata) : void
{
// store audit
}
}
interface StandardRepository
{
1 public function canHandle(string $aggregateClassName): bool;
2 public function findBy(string $aggregateClassName, array $identifiers) : ?object;
3 public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void;
}
#[Repository]
class DoctrineRepository implements StandardRepository
{
// implemented methods
}
final class EcotoneTicketRepository implements StandardRepository
{
public function __construct(private readonly EntityManagerInterface $entityManager)
{
}
public function canHandle(string $aggregateClassName): bool
{
return $aggregateClassName === Ticket::class;
}
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
return $this->entityManager->getRepository(Ticket::class)
// Array of identifiers for given Aggregate
->find($identifiers['ticketId']);
}
public function save(array $identifiers, object $aggregate, array $metadata, ?int $versionBeforeHandling): void
{
$this->entityManager->persist($aggregate);
}
}
public function save(
array $identifiers, object $aggregate, array $metadata,
// Version to verify before storing
?int $versionBeforeHandling
): void
#[Saga]
class OrderProcess
{
use WithAggregateVersioning;
(...)
Sagas, Workflow Orchestration
Async processing is unreliable and hard to debug
Resilient Messaging
Services need to communicate reliably across boundaries
Distributed Messaging
Multiple tenants need isolated processing
Multi-Tenancy
Java
Spring + Axon Framework
.NET
NServiceBus, MassTransit, Wolverine
PHP
Ecotone
Business logic is scattered across controllers and services
Complex multi-step processes are hard to follow and maintain
1) Pure Event Sourced Aggregate
This way of handling events does allow for pure functions. This means that actions called on the Aggregate returns Events and are not changing internal state of Aggregate.
Event Sourced Aggregate must provide version. You may leave it to Ecotone using WithAggregateVersioning or you can implement it yourself.
CommandHandlerfor event sourcing returns events generated by specific method. This will be passed to the to be stored.
EventSourcingHandler is method responsible for reconstructing Aggregate from previously created events. At least one event need to be handled in order to provide Identifier.
2) Internal Recorder Aggregate
This way of handling events allow for similarity with State Stored Aggregates.
This convention requires changing internal state of Aggregate to record Events.
Therefore Pure ES Aggregate is recommended as it's not require for any internal state changes in most of the scenarios.
However ES Aggregate with Internal Recorder may be useful for projects migrating with other solutions, or when our team is heavily used to working this way.
In order to make use of alternative way of handling events, we need to provide trait WithEvents.
Command Handlers instead of returning events are acting the same as State Stored Aggregates.
All events which will be published using recordThatwill be passed to the Repository to be stored.
#[EventSourcingAggregate] // 1
class Ticket
{
use WithAggregateVersioning; // 2
#[Identifier] // 1
private string $ticketId;
private string $ticketType;
#[CommandHandler] // 2
public static function register(RegisterTicket $command) : array
{
return [new TicketWasRegistered($command->getTicketId(), $command->getTicketType())];
}
#[CommandHandler] // 2
public function close(CloseTicket $command) : array
{
return [new TicketWasClosed($this->ticketId)];
}
#[EventSourcingHandler] // 4
public function applyTicketWasRegistered(TicketWasRegistered $event) : void
{
$this->ticketId = $event->getTicketId();
$this->ticketType = $event->getTicketType();
}
}
#[EventSourcingAggregate]
class Basket
{
use WithEvents; // 1
use WithVersioning;
#[Identifier]
private string $id;
#[CommandHandler] // 2
public static function create(CreateBasket $command) : static
{
$basket = new static();
$basket->recordThat(new BasketWasCreated($command->getId()));
return $basket;
}
#[CommandHandler] // 2
public function addProduct(AddProduct $command) : void
{
$this->recordThat(new ProductWasAddedToBasket($this->id, $command->getProductName()));
}
#[EventSourcingHandler]
public function applyBasketWasCreated(BasketWasCreated $basketWasCreated)
{
$this->id = $basketWasCreated->getId();
}
}
No history — you know what the current price is, but not what it was yesterday
Risky migrations — changing the read model means writing one-off scripts and praying
Compliance gaps — auditors ask for a complete trail of changes and you can't provide one
What the Industry Calls It
Event Sourcing — instead of storing the current state, store the sequence of events that led to it. Rebuild any view of the data by replaying events. Get a complete, immutable audit trail for free.
How Ecotone Solves It
Ecotone provides Event Sourcing as a first-class feature with built-in projections. Your aggregate records events instead of mutating state:
Build read models (projections) that can be rebuilt at any time from the event history:
Works with Postgres, MySQL, and MariaDB for event storage. Projections can write to any storage you choose.
#[EventSourcingAggregate]
class Order
{
#[Identifier]
private string $orderId;
#[CommandHandler]
public static function place(PlaceOrder $command): array
{
return [new OrderWasPlaced($command->orderId, $command->items)];
}
#[EventSourcingHandler]
public function onOrderPlaced(OrderWasPlaced $event): void
{
$this->orderId = $event->orderId;
}
}
#[ProjectionV2('order_list')]
#[FromAggregateStream(Order::class)]
class OrderListProjection
{
#[EventHandler]
public function onOrderPlaced(OrderWasPlaced $event): void
{
// Build your read model — rebuildable from history
}
}
To tell Ecotone to retrieve Events from your Aggregate add trait WithEvents which contains two methods: recordThat and getRecordedEvents.
As Ecotone never forces to use framework specific classes in your business code, you may replace it with your own implementation.
After importing trait, Events will be automatically retrieved and published after handling Command in your Aggregate.
Using recordThat will delay sending an event till the moment your Aggregate is saved in the Repository. This way you ensure that no Event Handlers will be called before the state is actually stored.
Subscribing to Event from your Aggregate
Sometimes you may have situation, where Event from one Aggregate will actually change another Aggregate. In those situations you may actually subscribe to the Event directly from Aggregate, to avoid creating higher level boilerplate code.
In those situations however you need to ensure event contains of reference id, so Ecotone knows which Aggregate to load from the database.
For more sophisticated scenarios, where there is no direct identifier in corresponding event, you may use of identifier mapping. You can read about it more in
Sending Named Events
You may subscribe to Events by names, instead of the class itself. This is useful in cases where we want to decoupled the modules more, or we are not interested with the Event Payload at all.
For Events published from your Aggregate, it's enough to provide NamedEvent attribute with the name of your event.
And then you can subscribe to the Event using name
Advanced Aggregate creation
DDD PHP
Create an Aggregate by another Aggregate
There may be a scenario where the creation of an Aggregate is conditioned by the current state of another Aggregate.
Ecotone provides a possibility for that and lets you focus more on domain modeling rather than technical nuances you may face trying to implement an actual use case.
This case is supported by both Event Sourcing and State-based Aggregates.
Create a State-based Aggregate
It is possible to send a command to an Aggregate and expect a State-based Aggregate to be returned.
Create an Event Sourcing Aggregate
It is also possible to send a command to an Aggregate and expect the Event Sourcing Aggregate to be returned.
Events handling
Both of the Aggregates (called and result) can still record their Events using an Internal Recorder. Recorded Events will be published after the operation is persisted in the database.
Persisting a state change
In the case of an Event Sourcing Aggregate recording an event indicates a state change of that Aggregate.
Also, when calling a State-based Aggregate its state may be changed before returning the newly created Aggregate. E.g. you want to save a reference to the newly created Aggregate.
Ecotone will try to persist both called and returned Aggregates.
When splitting your aggregates into the smallest, independent parts of the domain you have to be aware of transaction boundaries which Aggregate has to protect. In the case where the creation of an Aggregate is the transaction boundary of another Aggregate, it may require a state change of the one that protects that boundary.
This is a very specific scenario where two aggregates will persist at the same time within the same transaction which is covered by Ecotone.
Fetching/Storing Aggregates
Fetching and storing Aggregates with repositories in Ecotone PHP
Default flow
In default flow there is no need to fetch or store Aggregates, because this is done for us. We simply need to trigger an Command via CommandBus. However in some cases, you may want to retake orchestration flow and do it directly. For that cases Business Repository Interface or Instant Fetch Aggregate can help you.
Business Repository Interface
Special type of is Repository.
This Interface allows us to simply load and store our Aggregates directly. In situations when we call Command directly in our Aggregates we won't be in need to use it. However for some specific cases, where we need to load Aggregate and store it outside of Aggregate's Command Handler, this business interface becomes useful.
To make use of this Business Interface, we need our being registered.
Ecotone will read type hint to understand which Aggregate you would like to fetch or save.
Implementation will be delivered by Ecotone. All you need to do is to define the interface and it will available in your Dependency Container
Pure Event Sourced Repository
When using Pure Event Sourced Aggregate, instance of Aggregate does not hold recorded Events. Therefore passing aggregate instance would not contain any information about recorded events.
For Pure Event Sourced Aggregates, we can use direct event passing to the repository:
Instant Fetch Aggregate
Fetch aggregates directly in your handlers without repository injection boilerplate. Aggregates arrive automatically via the #[Fetch] attribute, keeping handler code focused on business logic.
You'll know you need this when:
Every aggregate command handler follows the same pattern: inject repository, fetch aggregate, call method, save
Repository injection boilerplate obscures the actual business logic in your handlers
You want your domain code to express "what happens" without "how to load it"
Instant Fetch Aggregate is available as part of Ecotone Enterprise.
To do instant fetch of Aggregate we will be using Fetch Attribute.
Suppose we want PlaceOrder Command Handler, and we want to fetch User Aggregate:
Fetch using to evaluate the expression given inside the Attribute.
For example having above "payload.userId" and following Command:
Ecotone will use userId from the Command to fetch User Aggregate instance.
&#xNAN;"payload" is special variable within expression that points to our Command, therefore whatever is available within the Command is available for us to do the fetching.
This provides quick way of accessing related Aggregates without the need to inject Repositories.
Allowing non existing Aggregates
By default Ecotone will throw Exception if Aggregate is not found, we can change the behaviour simply by allowing nulls in our method declaration:
Accessing Message Headers
We can also use Message Headers to fetch our related Aggregate instance:
Using External Services
In some cases we may not have enough information to provide correct Identifier, for example that may require some mapping in order to get the Identifier. For this cases we can use "reference" function to access any Service from Depedency Container in order to do the mapping.
Introduction
Introduction to Business Interfaces in Ecotone PHP
Business Interface aims to reduce boierplate code and make your domain actions explicit.
In Application we describe an Interface, which executes Business methods. Ecotone will deliver implementation for this interface, which will bind the interface with specific actions.
This way we can get rid of delegation level code and focus on the things we want to achieve.
For example, if we don't want to trigger action via Command/Query Bus, we can do it directly using our business interface and skip all the Middlewares that would normally trigger during Bus execution.
There are different types of Business Interfaces and in this chapter we will discuss the basics of build our own Business Interface, in next sections we will dive into specific types of business interfaces: Repositories and Database Layers.
Command Interface
Let's take as an example creating new Ticket
We may define interface, that will call this Command Handler whenever will be executed.
This way we don't need to use Command Bus and we can bypass all Bus related interceptors.
The attribute #[BusinessMethod] tells Ecotone that given Interface is meant to be used as entrypoint to Messaging and which Message Handler it should send the Message to.
Ecotone will provide implementation of this interface directly in our Dependency Container.
From lower level API Business Method is actually a .
Aggregate Command Interface
We may also execute given Aggregate directly using Business Interface.
Then we define interface:
We may of course pass Command class if we need to pass more data to our Aggregate's Command Handler.
Query Interface
Defining Query Interface works exactly the same as Command Interface and we may also use it with Aggregates.
Then we may call this Query Handler using Interface
Result Conversion
If we have registered then we let Ecotone do conversion to Message Handler specific format:
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array to TicketDTO.
Such conversion are useful in order to work with objects and to avoid writing transformation code in our business code. We can build generic queries, and transform them to different classes using different business methods.
Payload Conversion
If we have registered then we let Ecotone do conversion to Message Handler specific format:
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array to CreateTicket command class.
This type of conversion is especially useful, when we receive data from external source and we simply want to push it to given Message Handler. We avoid doing transformations ourselves, as we simply push what we receive as array.
Working with Aggregates
Working with Event Sourcing Aggregates in Ecotone PHP
Working with Event Sourcing Aggregates
Just as with Standard Aggregate, ES Aggregates are called by Command Handlers, however what they return are Events and they do not change their internal state.
#[EventSourcingAggregate]
class Product
{
use WithAggregateVersioning;
#[Identifier]
private string $id;
#[CommandHandler]
public static function create(CreateProduct $command) : array
{
return [new ProductWasCreated($command->id, $command->name, $command->price)];
}
}
When this Aggregate will be called via Command Bus with CreateProduct Command, it will then return new ProductWasCreated Event.
Command Handlers may return single events, multiple events or no events at all, if nothing is meant to be changed.
Event Stream
Aggregates under the hood make use of Partition persistence strategy (Refer to ). This means that we need to know:
Aggregate Version
Aggregate Id
Aggregate Type
Aggregate Version
To find out about current version of Aggregate Ecotone will look for property marked with Version Attribute.
We don't to add this property directly, we can use trait instead:
Anyways, this is all we need to do, as Ecotone will take care of reading and writing to this property.
This way we can focus on the business logic of the Aggregate, and Framework will take care of tracking the version.
Aggregate Id (Partition Key)
We need to tell to Ecotone what is the Identifier of our Event Sourcing Aggregate.
This is done by having property marked with Identifier in the Aggregate:
As Command Handlers are pure and do not change the state of our Event Sourcing Aggregate, this means we need a different way to mutate the state in order to assign the identifier.
For changing the state we use EventSourcingHandler attribute, which tell Ecotone that if given Event happens, then trigger this method afterwards:
We will explore how applying Events works more in .
Aggregate Type
Aggregate Type will be the same as Aggregate Class.
We can decouple the class from the Aggregate Type, more about this can be found in "" section.
Recording Events in the Event Stream
So when this Command Handler happens:
What actually will happen under the hood is that this Event will be applied to the Event Stream:
As storing in Event Store is abstracted away, the code stays clean and contains only of the business part.
We can the Stream Name, Aggregate Type and even Event Names when needed.
Event Sourcing Repository
Configuring Event Sourcing repositories in Ecotone PHP
Ecotone comes with inbuilt Event Sourcing repository after Event Sourcing package is installed. However you want to roll out your own storage for Events, or maybe you already use some event-sourcing framework and would like to integrate with it.
For this you can take over the control by introducing your own Event Sourcing Repository.
Using Custom Event Sourcing Repository will not allow you to make use of inbuilt projection system. Therefore consider configuring your own Event Sourcing Repository only if you want to build your own projecting system.
Custom Event Sourcing Repository
We do start by implementing EventSourcingRepository interface:
canHandle - Tells whatever given Aggregate is handled by this Repository
findBy - Method returns previously created events for given aggregate. Which Ecotone will use to reconstruct the Aggregate.
save - Stores events recorded by Event Sourced Aggregate
and then we need to mark class which implements this interface as Repository
Storing Events
Ecotone provides enough information to decide how to store provided events.
Identifiers will hold array of identifiers related to given aggregate (e.g. ["orderId" ⇒ 123]).
Events will be list of Ecotone's Event classes, which contains of payload and metadata, where payload is your Event class instance and metadata is specific to this event.
Metadata as parameter is generic metadata available at the moment of Aggregate execution.
Version before handling on other hand is the version of the Aggregate before any action was triggered on it. This can be used to protect from concurrency issues.
The structure of Events is as follows:
Core metadata
It's worth to mention about Ecotone's Events and especially about metadata part of the Event.
Each metadata for given Event contains of three core Event attributes:
"_aggregate_id" - This provides aggregate identifier of related Aggregate
"_aggregate_version" - This provides version of the related Event (e.g. 1/2/3/4)
"_aggregate_type" - This provides type of the Aggregate being stored, which can be customized
Aggregate Type
If our repository stores multiple Aggregates is useful to have the information about the type of Aggregate we are storing. However keeping the class name is not best idea, as simply refactor would break our Event Stream. Therefore Ecotone provides a way to mark our Aggregate type using Attribute
This now will be passed together with Events under _aggregate_type metadata.
Named Events
In Ecotone we can name the events to avoid storing class names in the Event Stream, to do so we use NamedEvent.
then when events will be passed to save method, they will automatically provide this name under eventName property.
Snapshoting
With custom repository we still can use inbuilt . To use it for customized repository we will use BaseEventSourcingConfiguration.
Ecotone then after fetching snapshot, will load events only from this given moment using `fromAggregateVersion`.
Testing
If you want to test out your flow and storing with your custom Event Sourced Repository, you should disable default in memory repository
Database Business Interface
Database Business Interface for type-safe database access in PHP
Ecotone allows to work with Database using DbalBusinessMethod. The goal is to create abstraction which significantly reduce the amount of boilerplate code required to implement data access layers.
Thanks to Dbal based Business Methods we are able to avoid writing integration and transformation level code and focus on the Business part of the system.
To make use of Dbal based Business Method, .
Write Business Methods
Let's consider scenario where we want to store new record in Persons table. To make it happen just like with Business Method we will create an Interface, yet this time we will mark it with
Identifier Mapping
Mapping identifiers for Aggregate and Saga routing in Ecotone
When loading Aggregates or Sagas we need to know what Identifier should be used for that.
This depending on the business feature we work may require different approaches.
In this section we will dive into different solutions which we can use.
Auto-Mapping from the Command/Event
Ecotone resolves the mapping automatically, when Identifier in the Aggregate/Saga is named the same as the property in the Command/Event.
then, if Message has productId, it will be used for Mapping:
Message Headers
Working with Message Headers and metadata in Ecotone PHP
Ecotone provides easy way to pass Message Headers (Metadata) with your Message and use it in your Message Handlers or .
In case of asynchronous scenarios, Message Headers will be automatically mapped and passed to through your Message Broker.
Passing headers to Bus:
Pass your metadata (headers), as second parameter.
Then you may access them directly in Message Handlers:
Event Streams and Handlers
PHP Event Sourcing Projection Streams and Event Handlers
The Problem
Your projection needs data from multiple aggregates — orders AND payments — or you want to handle only specific events instead of everything in the stream. How do you control what events reach your projection and how they are routed?
Emitting Events
PHP Event Sourcing Projection Event Emission
The Problem
You update a wallet balance projection and want to notify the user via WebSocket — but if you subscribe to the domain event directly, the user sees the old balance because the projection hasn't refreshed yet. How do you notify after the projection is up to date?
The challenge is timing: domain events fire before the projection processes them. If a subscriber sends a notification immediately, the user loads the page and sees stale data.
Failure Handling
PHP Event Sourcing Projection Failure Handling and Recovery
The Problem
Your projection handler throws an exception halfway through processing a batch of 100 events. Are the first 50 events committed or rolled back? Does the failure block all other projections, or just this one? And when the bug is fixed, does the projection automatically recover?
Upgrading from V1 to V2
Upgrading from Projection V1 to ProjectionV2
The Problem
You have existing projections using the old #[Projection] API and want to migrate to #[ProjectionV2]. Do you need to migrate data? Will there be downtime?
Dbal Dead Letter
DBAL-based dead letter queue for storing and replaying failed messages
Dbal Dead Letter
Ecotone comes with full support for managing full life cycle of a error message by using .
Store failed Message with all details about the exception
Lifecycle Management
PHP Event Sourcing Projection Lifecycle and CLI
The Problem
Your projection's table schema changed, or you found a bug in a handler and the read model has wrong data. How do you set up, tear down, and rebuild projections without writing manual SQL scripts?
Ecotone provides lifecycle hooks — methods on your projection class that are called at specific moments — and CLI commands to trigger them.
Outbox Pattern
Outbox pattern for atomic message publishing with database transactions
Outbox Pattern
To ensure full level of data consistency, we may decide to store messages along side with data changes. This way we work only with single data storage, avoiding completely problem with persisting Message in two sources at once.
To make it happen Ecotone implements so called Outbox pattern.
Resiliency
Production resilience with retries, dead letter, outbox, and deduplication
Works with: Laravel, Symfony, and Standalone PHP
The Problem
A failed HTTP call crashes your handler. A duplicate webhook triggers double-processing. You've wrapped handlers in try/catch blocks and retry loops — each one slightly different. Error handling is scattered across your codebase with no consistent strategy.
The first parameter passed to DbalBusinessMethod is actual SQL, where we can provide set of named parameters. Ecotone will automatically bind parameters from method declaration to SQL ones by names.
Above example will use DbalConnectionFactory::class for database Connection, which is the default for Dbal Module. If you want to run Business Method on different connection, you can do it using connectionReferenceName parameter inside the Attribute.
Custom Parameter Name
We may bind parameter name explicitly by using DbalParameter attribute.
This can be used when we want to decouple interface parameter names from binded parameters or when name in database column is not explicit enough for being part of interface.
Returning number of records changed
If we want to return amount of the records that have been changed, we can add int type hint to our Business Method:
Query Business Methods
We may want to fetch data from the database and for this we will be using DbalQueryBusinessMethod.
The above will return result as associative array with the columns provided in SELECT statement.
Fetching Mode
To format result differently we may use different fetch modes. The default fetch Mode is associative array.
First Column Fetch Mode
This will extract the first column from each row, which allows us to return array of person Ids directly.
First Column of first row Mode
To get single variable out of Result Set we can use First Column of first row Mode.
This way we can provide simple interfaces for things Aggregate SQLs, like SUM or COUNT.
First Row Mode
To fetch first Row of given Result Set, we can use First Row Mode.
This will return array containing person_id and name.
Returning Nulls
When using First Row Mode, we may end up having no returned row at all. In this situation Dbal will return false, however if Return Type will be nullable, then Ecotone will convert false to null.
Returning Iterator
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one.
Parameter Types
Each parameter may have different type and Ecotone will try to recognize specific type and set it up accordingly. If we want, we can take over and define the type explicitly.
You may use multiple aggregate identifiers or identifier as objects (e.g. Uuid) as long as they provide __toString method
Expose Identifier using Method
We may also expose identifier over public method by annotating it with attribute IdentifierMethod("productId").
Targeting Identifier from Event/Command
If the property name is different than Identifier in the Aggregate/Saga, we need to give Ecotone a hint, how to correlate identifiers.
We can do that using TargetIdentifier attribute, which states to which Identifier given property references too:
Targeting Identifier from Metadata
When there is no property to correlate inside Command or Event, we can use Identifier from Metadata.
When we've the identifier inside Metadata then we can use identifierMetadataMapping.
Suppose the orderId identifier is available in metadata under key orderNumber, then we can then use this mapping:
We can make use of Before or PresendInterceptors to enrich event's metadata with required identifiers.
Dynamic Identifier
We may provide Identifier dynamically using Command Bus. This way we can state explicitly what Aggregate/Saga instance we do refer too. Thanks to we don't need to define Identifier inside the Command and we can skip any kind of mapping.
In some scenario we won't be in deal to create an Command class at all. For example we may provide block user action, which changes the status:
Event so we are using "aggregate.id" in the metadata, this will work exactly the same for Sagas. Therefore if we want to trigger Message Handler on the Saga, we can use "aggregate.id" too.
Advanced Identifier Mapping
There may be cases where more advanced mapping may be needed. In those cases we can use identifier mapping based on Expression Language.
When using identifierMapping configuration, we get access to the Message fully and to Dependency Container. To access specific part we will be using:
payload-> Represents our Event/Command class
headers -> Represents our Message's metadata
reference('name') -> Allow to access given service from our Dependency Container
Suppose the orderId identifier is available in metadata under key orderNumber, then we can tell Message Handler to use this mapping:
Suppose our Identifier is an Email object within Command class and we would like to normalize before it's used for fetching the Aggregate/Saga:
Suppose we receive external order id, however we do have in database our internal order id that should be used as Identifier. We could then have a Service registered in DI under "orderIdExchange":
Then we can make use of it in our identifier Mapping
Subscribing to Event Streams
Every Projection needs to declare which Event Streams it reads from. This tells Ecotone where to fetch events when the Projection is triggered.
From Aggregate Stream (Recommended)
The most common case — subscribe to all events from a single aggregate type using #[FromAggregateStream]:
#[FromAggregateStream(Ticket::class)] automatically resolves both the stream name and the aggregate type from the Ticket class. This enables Ecotone to use the correct database indexes for fast event loading.
Always prefer #[FromAggregateStream] when your aggregate class is available. It ensures optimal performance by providing the aggregate type metadata that enables indexed queries on the Event Store.
From Multiple Aggregate Streams
When your Read Model combines data from multiple aggregates, use multiple #[FromAggregateStream] attributes:
The Projection will process events from both streams, ordered by when they were stored.
From a Named Stream
In some cases you may need to specify the stream name directly — for example when the aggregate class has been deleted or when targeting a custom stream name. Use #[FromStream] for this:
When using #[FromStream], always provide the aggregateType parameter. Without it, Ecotone cannot use the aggregate type index on the Event Store, resulting in significantly slower event loading — especially on large streams.
Event Handler Routing
Ecotone routes events to the correct handler method. You have several options for controlling how this works.
By Type Hint (Default)
The simplest approach — Ecotone routes based on the event class in the method signature:
Named Events
If your events use #[NamedEvent] to decouple the stored event name from the PHP class name:
You can still type-hint your handler with the class — Ecotone automatically resolves the #[NamedEvent] mapping:
You don't need to match the event name manually in #[EventHandler('ticket.registered')]. As long as the event class has #[NamedEvent], type-hinting the class is enough — Ecotone handles the routing for you.
You can also subscribe by name explicitly, which is useful when you don't have (or don't want to import) the event class:
Catch-All Handler
To receive every event in the stream regardless of type:
Using Array Payload for Performance
When handling events by name, you can accept the raw array payload instead of a deserialized object. This skips deserialization and can significantly speed up processing — especially useful during backfill or rebuild with large event volumes:
Using array payloads avoids the cost of deserializing event objects. When rebuilding a projection with thousands of events, this can make a noticeable difference in processing time.
What's Next
Instead of writing raw SQL in your projections, you can use Ecotone's Document Store for automatic serialization and storage — especially useful for rapid prototyping and simpler Read Models.
The Solution: Emit Events from Projections
Instead of subscribing to domain events (which fire before the projection updates), subscribe to events emitted by the projection itself — these fire after the Read Model is up to date.
Emit the Event
Use EventStreamEmitter inside your projection to emit events after updating the Read Model:
Emitted events are stored in the projection's own stream.
Events are stored in a stream called project_{projectionName}. In the example above: project_wallet_balance.
Subscribing to Emitted Events
After emitting, you can subscribe to these events just like any other event — in a regular event handler or even another projection:
All emitted events are stored in streams, so you can create another projection that subscribes to them — building derived views from derived views.
Linking Events to Other Streams
In some cases you may want to emit an event to an existing stream (for example, to provide a summary event) or to a custom stream:
linkTo works from any place in the code. emit stores events in the projection's own stream and only works inside a projection.
Controlling Event Emission
During Rebuild
When a projection is rebuilt (reset and replayed from the beginning), emitted events could be republished — causing duplicate notifications and duplicate linked events.
Ecotone handles this automatically: events emitted during a reset/rebuild phase are not republished or stored. This is safe by default.
This is the key difference between using EventStreamEmitter versus EventBus. The EventBus would simply republish events during a rebuild, causing duplicates. EventStreamEmitter suppresses them.
With ProjectionDeployment (Enterprise)
You can also explicitly suppress event emission by setting live: false on #[ProjectionDeployment]:
This is important because backfill will emit events — it replays historical events through your handlers, and if those handlers call EventStreamEmitter, all those events will be published to downstream consumers. If you're backfilling a projection with 2 years of history, that means thousands of duplicate notifications.
Use live: false during backfill to prevent this, then switch to live: true once the projection is caught up. This is the pattern used in blue-green deployments.
#[ProjectionDeployment] is available as part of Ecotone Enterprise.
Deleting the Projection
When a projection is deleted, Ecotone automatically deletes the projection's event stream (project_{name}).
Custom streams created via linkTo are not automatically deleted — they may be shared with other consumers.
Called when the projection is first set up. Use it to create tables, indexes, or any storage structure:
By default, projections auto-initialize on the first event trigger. You don't need to run initialization manually unless you use #[ProjectionDeployment(manualKickOff: true)].
Delete
Called when the projection is permanently removed. Clean up all storage:
Reset
Called when the projection needs to be rebuilt from scratch. Clear the data but keep the structure:
After a reset, the projection's position is set back to the beginning. The next trigger will replay all events from the start.
Flush
Called after each batch of events is processed. Useful for flushing buffers or intermediate state:
See Execution Modes for how batching and flushing work together.
CLI Commands
Initialize a Projection
Delete a Projection
Calls #[ProjectionDelete] and removes all tracking state:
Backfill a Projection
Populates a fresh projection with historical events. See Backfill and Rebuild for details:
Rebuild a Projection (Enterprise)
Resets the projection and replays all events. See Backfill and Rebuild for details:
The rebuild command is available as part of Ecotone Enterprise.
Automatic vs Manual Initialization
By default, projections auto-initialize the first time an event triggers them. This means you don't need to run any CLI command — the #[ProjectionInitialization] method is called automatically.
If you need manual control (for example, during blue-green deployments), you can disable auto-initialization:
#[ProjectionDeployment] is available as part of Ecotone Enterprise.
Reset and Trigger
To rebuild a projection manually, you can reset it (clears data and position) and then trigger it (starts processing from the beginning):
Reset — calls #[ProjectionReset], clears position to the beginning
Trigger — starts processing events from position 0, rebuilding the entire Read Model
This is useful when you've fixed a bug in a handler and need to reprocess all events to correct the data.
For critical parts of the systems we may decide to commit Messages to the same database as data changes using Outbox pattern.
Installation
In order to use Outbox pattern we need to set up Dbal Module.
Dbal Message Channel
By sending asynchronous messages via database, we are storing them together with data changes. This thanks to default transactions for Command Handlers, commits them together.
Asynchronous Event Handler
After this all your messages will be go through your database as a message channel.
Setup Outbox where it's needed
With Ecotone's Outbox pattern we set up given Channel to run via Database. This means that we can target specific channels, that are crucial to run under outbox pattern.
In other cases where data consistency is not so important to us, we may actually use Message Broker Channels directly and skip the Outbox.
As an example, registering payments and payouts may an crucial action in our system, so we use it with Outbox pattern. However sending an "Welcome" notification may be just fine to run directly with Message Broker.
Scaling the solution
One of the challenges of implementing Outbox pattern is way to scale it. When we start consume a lot of messages, we may need to run more consumers in order to handle the load.
Publishing deduplication
In case of Ecotone, you may safely scale your Messages Consumers that are consuming from your Dbal Message Channel. Each message will be reserved for the time of being published, thanks to that no duplicates will be sent when we scale.
Handling via different Message Broker
However we may actually want to avoid scaling our Dbal based Message Consumers to avoid increasing the load on the database.
For this situation Ecotone allows to make use so called Combined Message Channels.
In that case we would run Database Channel only for the outbox and for actual Message Handler execution a different one.
This is powerful concept, as we may safely produce messages with outbox and yet be able to handle and scale via RabbitMQSQSRedis etc.
database_channel is Dbal Message Channel
rabbit_channel is our RabbitMQ Message Channel
Then we run one or few Message Consumers for outbox and we scale Message Consumers for rabbit.
Combined Message Channels with reference
If we want more convient way as we would like to apply combined message channels on multiple Message Handlers, we may create an reference.
And then we use reference for our Message Handlers.
#[Aggregate]
class Ticket
{
// Import trait with recordThat method
use WithEvents;
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public function changeTicket(ChangeTicket $command): void
{
$this->description = $command->description;
$this->assignedTo = $command->assignedTo;
// Record the event
$this->recordThat(new TicketWasChanged($this->ticketId));
}
}
#[Aggregate]
class Promotion
{
#[Identifier]
private Uuid $userId;
private bool $isActive;
#[EventHandler]
public function stop(AccountWasClosed $event): void
{
$this->isActive = false;
}
}
class readonly AccountWasClosed
{
public function __construct(public Uuid $userId) {}
}
#[NamedEvent('order.placed')]
final readonly class OrderWasPlaced
{
public function __construct(
public string $orderId,
public string $productId
) {}
}
#[EventHandler(listenTo: "order.placed")]
public function notify(#[Header("executoId")] $executorId): void
{
// notify user that the Order was placed
}
#[Aggregate]
final class Calendar
{
/** @var array<string> */
private array $meetings = [];
public function __construct(#[Identifier] public string $calendarId)
{
}
#[CommandHandler]
public function scheduleMeeting(ScheduleMeeting $command): Meeting
{
// checking business rules
$this->meetings[] = $command->meetingId;
return new Meeting($command->meetingId);
}
}
#[Aggregate]
final class Meeting
{
public function __construct(#[Identifier] public string $meetingId)
{
}
}
#[Aggregate]
final class Calendar
{
/** @var array<string> */
private array $meetings = [];
public function __construct(#[Identifier] public string $calendarId)
{
}
#[CommandHandler]
public function scheduleMeeting(ScheduleMeeting $command): Meeting
{
// checking business rules
$this->meetings[] = $command->meetingId;
return Meeting::create($command->meetingId);
}
}
#[EventSourcingAggregate(true)]
final class Meeting
{
use WithEvents;
use WithAggregateVersioning;
#[Identifier]
public string $meetingId;
public static function create(string $meetingId): self
{
$meeting = new self();
$meeting->recordThat(new MeetingCreated($meetingId));
return $meeting;
}
}
interface OrderRepository
{
#[Repository]
public function getOrder(string $twitId): Order;
#[Repository]
public function findOrder(string $twitId): ?Order;
#[Repository]
public function save(Twitter $twitter): void;
}
interface OrderRepository
{
#[Repository]
public function getOrder(string $twitId): Order;
#[Repository]
public function findOrder(string $twitId): ?Order;
#[Repository]
#[RelatedAggregate(Order::class)]
public function save(string $aggregateId, int $currentVersion, array $events): void;
}
#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("payload.userId")] User $user
): void {
// do something
}
class readonly PlaceOrder
{
public function __construct(
public string $orderId,
public string $userId,
public string $productId
) {
}
#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("payload.userId")] ?User $user // we marked it as possible null
): void {
// do something
}
#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("headers['userId']")] User $user
): void {
// do something
}
#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("reference('emailToIdMapper').map(payload.email)")] User $user
): void {
// do something
}
class TicketService
{
#[CommandHandler("ticket.create")]
public function createTicket(CreateTicketCommand $command) : void
{
// handle create ticket command
}
}
interface TicketApi
{
#[BusinessMethod('ticket.create')]
public function create(CreateTicketCommand $command): void;
}
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private bool $isClosed;
#[CommandHandler("ticket.close")]
public function close(): void
{
$this->isClosed = true;
}
}
interface TicketApi
{
#[BusinessMethod('ticket.close')]
public function create(#[Identifier] Uuid $ticketId): void;
}
class TicketService
{
#[QueryHandler("ticket.get_by_id")]
public function getTicket(GetTicketById $query) : array
{
//return ticket
}
}
interface TicketApi
{
#[BusinessMethod("ticket.get_by_id")]
public function getTicket(GetTicketById $query): array;
}
class TicketService
{
#[QueryHandler("ticket.get_by_id")]
public function getTicket(GetTicketById $query) : array
{
//return ticket as array
}
}
interface TicketApi
{
// return ticket as Class
#[BusinessMethod("ticket.get_by_id")]
public function getTicket(GetTicketById $query): TicketDTO;
}
class TicketService
{
#[CommandHandler("ticket.create")]
public function getTicket(CreateTicket $command) : void
{
}
}
interface TicketApi
{
#[BusinessMethod("ticket.create")]
public function getTicket(array $query): void;
}
#[Version]
private int $version = 0;
#[EventSourcingAggregate]
class Product
{
use WithAggregateVersioning;
#[Identifier]
private string $id;
#[EventSourcingHandler]
public function applyProductWasCreated(ProductWasCreated $event) : void
{
$this->id = $event->id;
}
#[CommandHandler]
public static function create(CreateProduct $command) : array
{
return [new ProductWasCreated($command->id, $command->name, $command->price)];
}
interface EventSourcedRepository
{
1. public function canHandle(string $aggregateClassName): bool;
2. public function findBy(string $aggregateClassName, array $identifiers, int $fromAggregateVersion = 1) : EventStream;
3. public function save(array $identifiers, string $aggregateClassName, array $events, array $metadata, int $versionBeforeHandling): void;
}
#[Repository]
class CustomEventSourcingRepository
public function save(
array $identifiers,
string $aggregateClassName,
array $events,
array $metadata,
int $versionBeforeHandling
): void;
class Event
{
private function __construct(
private string $eventName, // either class name or name of the event
private object $payload, // event object instance
private array $metadata // related metadata
)
}
#[EventSourcingAggregate]
#[AggregateType("basket")]
class Basket
#[NamedEvent("order_was_placed")]
class OrderWasPlaced
#[ServiceContext]
public function configuration()
{
return BaseEventSourcingConfiguration::withDefaults()
->withSnapshotsFor(Basket::class, thresholdTrigger: 100);
}
public function findBy(
string $aggregateClassName,
array $identifiers,
int $fromAggregateVersion = 1
)
interface PersonApi
{
#[DbalWrite("INSERT INTO persons VALUES (:personId, :name)")]
public function register(int $personId, string $name): void;
}
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function register(
#[DbalParameter(name: 'personId')] int $id,
string $name
): void;
#[DbalWrite('UPDATE persons SET name = :name WHERE person_id = :personId')]
public function changeName(int $personId, string $name): int;
interface PersonApi
{
#[DbalQueryMethod('SELECT person_id, name FROM persons LIMIT :limit OFFSET :offset')]
public function getNameList(int $limit, int $offset): array;
}
/**
* @return int[]
*/
#[DbalQuery(
'SELECT person_id FROM persons ORDER BY person_id ASC LIMIT :limit OFFSET :offset',
fetchMode: FetchMode::FIRST_COLUMN
)]
public function getPersonIds(int $limit, int $offset): array;
#[DbalQuery(
'SELECT COUNT(*) FROM persons',
fetchMode: FetchMode::FIRST_COLUMN_OF_FIRST_ROW
)]
public function countPersons(): int;
#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTO(int $personId): array;
#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTOOrNull(int $personId): PersonNameDTO|null;
#[DbalQuery(
'SELECT person_id, name FROM persons ORDER BY person_id ASC',
fetchMode: FetchMode::ITERATE
)]
public function getPersonIdsIterator(): iterable;
#[DbalQuery('SELECT * FROM persons WHERE person_id IN (:personIds)')]
public function getPersonsWith(
#[DbalParameter(type: ArrayParameterType::INTEGER)] array $personIds
): array;
#[Aggregate]
class Product
{
#[Identifier]
private string $productId;
class ChangePriceCommand
{
private string $productId;
private Money $priceAmount;
}
#[Aggregate]
class Product
{
private string $id;
#[IdentifierMethod("productId")]
public function getProductId(): string
{
return $this->id;
}
class SomeEvent
{
#[TargetIdentifier("orderId")]
private string $purchaseId;
}
#[EventHandler(identifierMetadataMapping: ["orderId" => "orderNumber"])]
public function failPayment(PaymentWasFailedEvent $event, CommandBus $commandBus) : self
{
// do something with $event
}
$this->commandBus->sendWithRouting('user.block', metadata:
'aggregate.id' => $userId // This way we provide dynamic identifier
])
#[CommandHandler('user.block')]
public function block() : void
{
$this->status = 'blocked';
}
#[EventHandler(identifierMapping: ["orderId" => "headers['orderNumber']"])]
public function failPayment(PaymentWasFailedEvent $event, CommandBus $commandBus) : void
{
// do something with $event
}
class BlockUser
{
private Email $email;
(...)
public function getEmail(): Email
{
return $this->email;
}
}
#[CommandHandler(identifierMapping: [
"email" => "payload.getEmail().normalize()"]
)]
public function block(BlockUser $command) : void
{
// do something with $command
}
class OrderIdExchange
{
public function exchange(string $externalOrderId): string
{
// do the mapping
return $internalOrderId;
}
}
#[EventHandler(identifierMapping: [
"orderId" => "reference('orderIdExchange').exchange(payload.externalOrderId())"
])]
public function when(OrderCancelled $event) : void
{
// do something with $event
}
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
class TicketListProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// handle event
}
}
#[ProjectionV2('calendar_overview')]
#[FromAggregateStream(Calendar::class)]
#[FromAggregateStream(Meeting::class)]
class CalendarOverviewProjection
{
#[EventHandler]
public function onCalendarCreated(CalendarWasCreated $event): void
{
// handle calendar event
}
#[EventHandler]
public function onMeetingScheduled(MeetingWasScheduled $event): void
{
// handle meeting event
}
}
#[ProjectionV2('legacy_tickets')]
#[FromStream(stream: 'ticket_stream', aggregateType: 'App\Domain\Ticket')]
class LegacyTicketProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// handle event from explicitly named stream
}
}
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// Only called for TicketWasRegistered events
}
#[NamedEvent('ticket.registered')]
class TicketWasRegistered
{
public function __construct(
public readonly string $ticketId,
public readonly string $type
) {}
}
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// Works automatically — Ecotone knows TicketWasRegistered maps to 'ticket.registered'
}
#[EventHandler('ticket.registered')]
public function onTicketRegistered(array $event): void
{
// Subscribe by name, receive raw array — no class dependency needed
}
#[EventHandler('*')]
public function onAnyEvent(array $event): void
{
// Called for every event in the stream
}
#[EventHandler('ticket.registered')]
public function onTicketRegistered(array $event): void
{
// $event is the raw array — no deserialization overhead
$ticketId = $event['ticketId'];
}
#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
class WalletBalanceProjection
{
#[EventHandler]
public function whenMoneyWasAdded(
MoneyWasAddedToWallet $event,
EventStreamEmitter $eventStreamEmitter
): void {
$wallet = $this->getWalletFor($event->walletId);
$wallet = $wallet->add($event->amount);
$this->saveWallet($wallet);
$eventStreamEmitter->emit([
new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)
]);
}
#[EventHandler]
public function whenMoneyWasSubtracted(
MoneyWasSubtractedFromWallet $event,
EventStreamEmitter $eventStreamEmitter
): void {
$wallet = $this->getWalletFor($event->walletId);
$wallet = $wallet->subtract($event->amount);
$this->saveWallet($wallet);
$eventStreamEmitter->emit([
new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)
]);
}
(...)
}
class NotificationService
{
#[EventHandler]
public function when(WalletBalanceWasChanged $event): void
{
// Send WebSocket notification — the Read Model is already up to date
}
}
$eventStreamEmitter->linkTo('wallet', [
new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)
]);
#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
#[ProjectionDeployment(live: false)]
class WalletBalanceProjection
{
// EventStreamEmitter calls are silently skipped — no events are stored or published
}
bin/console ecotone:projection:init ticket_list
# Or initialize all projections at once:
bin/console ecotone:projection:init --all
artisan ecotone:projection:init ticket_list
# Or initialize all:
artisan ecotone:projection:init --all
#[ProjectionInitialization]
public function init(): void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS ticket_list (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25),
status VARCHAR(25)
)
SQL);
}
#[ProjectionDelete]
public function delete(): void
{
$this->connection->executeStatement('DROP TABLE IF EXISTS ticket_list');
}
#[ProjectionReset]
public function reset(): void
{
$this->connection->executeStatement('DELETE FROM ticket_list');
}
#[ProjectionFlush]
public function flush(): void
{
// Called after each batch commit
// Useful for clearing caches, flushing buffers, etc.
}
#[ServiceContext]
public function databaseChannel()
{
return DbalBackedMessageChannelBuilder::create("async");
}
#[Asynchronous("async")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
// notify about new order
}
#[Asynchronous(["database_channel", "rabbit_channel"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
}
#[ServiceContext]
public function combinedMessageChannel(): CombinedMessageChannel
{
return CombinedMessageChannel::create(
'outbox_sqs', //Reference name
['database_channel', 'amazon_sqs_channel'], // list of combined message channels
);
}
#[Asynchronous(["outbox_sqs"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
}
Converting Headers
If you have defined Converter for given type, then you may type hint for the object and Ecotone will do the conversion:
And then we can use Classes instead of scalar types for our Headers:
Ecotone provides a lot of support for Conversion, so we can work with higher level business class not scalars. Find out more in Conversion section.
Automatic Header Propagation
Ecotone by default propagate all Message Headers automatically. This as a result preserve context, which can be used on the later stage.
For example we may provide executorId header and skip it in our Command Handler, however use it in resulting Event Handler.
Automatic metadata propagation
This will execute Command Handler:
and then even so, we don't resend this Header when publishing Event, it will still be available for us:
When publishing Events from Aggregates or Sagas, metadata will be propagated automatically too.
Message Identification and Correlation
When using Messaging we may want to be able to trace where given Message came from, who was the parent how it was correlated with other Messages.
In Ecotone all Messages contains of Message Id, Correlation Id and Parent Id within Metadata. Those are automatically assigned and propagated by the Framework, so from application level code we don’t need to deal manage those Messaging level Concepts.
Using Message Id, Correlation Id are especially useful find out what have happened during the flow and if any part of the flow has failed.
Using already propagated Headers, we may build our own tracing solution on top of what Ecotone provides or use inbuilt support for OpenTelemetry.
Id
Each Message receives it's own unique Id, which is Uuid generated value. This is used by Ecotone to provide capabilities like Message Deduplication, Tracing and Message identification for Retries and Dead Letter.
Parent Id
"parentId" header refers to Message that was direct ancestor of it. In our case that can be correlation of Command and Event. As a result of sending an Command, we publish an Event Message.
id and parentId will be automatically assignated by Ecotone
Different Event Handlers receives same copy of the Event Message
Correlation Id
Correlation Id is useful for longer flows, which can span over multiple Message Handlers. In those situations we may be interested in how our Message flow have branched:
Ecotone taking care of propagating CorrelationId between Message Handlers
$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorUsername" => $security->getUser()->getUsername()]
);
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Header("executorUsername")] string $executor
) {
// handle closing ticket
}
class UsernameConverter
{
#[Converter]
public function from(string $data): Username
{
return Username::fromString($data);
}
}
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Header("executorUsername")] Username $executor
) {
// handle closing ticket
}
$this->commandBus->send(
new BlockerUser($request->get("userId")),
["executorId" => $security->getUser()->getCurrentId()]
);
#[CommandHandler]
public function closeTicket(BlockerUser $command, EventBus $eventBus) {
// handle blocking user
$eventBus->publish(new UserWasBlocked($command->id));
}
#[EventHandler]
public function closeTicket(
UserWasBlocked $command,
#[Header('executorId')] $executorId
) {
// handle storing audit data
}
How Projections Are Triggered
By default, Projections run synchronously. When a Command Handler stores events in the Event Stream, the Projection is triggered immediately — in the same process and the same database transaction.
Event Sourced Aggregate stores events, then they are published
The Projection subscribes to those events and is executed as a result:
Projection executes after events are published
Because both the Event Store write and the Projection update happen in the same transaction, your Read Model is always consistent with the Event Stream:
Command Handler and Projection wrapped in same transaction
This is important for understanding what gets reverted on failure — when a synchronous projection fails, the entire transaction (including the Event Store write) is rolled back. For asynchronous projections, the Event Store write and the Projection run in separate transactions.
Transaction Boundaries
Each batch of events is wrapped in a single database transaction. If any event in the batch causes an exception:
The entire batch is rolled back — no partial writes
The projection's position is not advanced — the same events will be reprocessed on the next run
The projection's state is not persisted — no corrupted state
This is all-or-nothing per batch. You never end up with half-processed data.
Batch Commits — Not One Giant Transaction
With #[ProjectionExecution(eventLoadingBatchSize: N)], events are loaded in batches. Each batch gets its own transaction:
Batch 2 (events 101-200): exception on event 150 → rolled back
Next run: starts from event 101 (batch 1's changes are safe)
This is important: if you have 100,000 events to process, you don't end up with one massive transaction that locks your tables for minutes. Each batch commits independently.
Ecotone manages transactions at batch boundaries automatically. If you use Doctrine ORM, Ecotone also flushes and clears the EntityManager at each batch boundary, preventing memory leaks and stale entity state.
Failure Isolation Between Projections
When running projections asynchronously, Ecotone delivers a copy of the trigger message to each async handler independently. This means if one projection fails, the failure does not propagate to other projections — even if they share the same message channel.
Example: You have TicketListProjection and TicketStatsProjection both running on the projections async channel. If TicketStatsProjection throws an exception, TicketListProjection continues processing normally. Each projection is isolated.
Multiple async projections on the same channel are fully isolated from each other. A failure in one projection never blocks or affects another.
Failure Impact Within a Projection
Within a single projection, how a failure affects processing depends on the projection type:
Global Projection
A failure blocks the entire projection. Because a global projection tracks a single position across all events in the stream, a failing event prevents any subsequent events from being processed — even events for unrelated aggregates.
Example: Event #50 fails for Ticket-A. Events #51-#100 (for Ticket-B, Ticket-C, etc.) cannot be processed until event #50 succeeds.
Partitioned Projection (Enterprise)
A failure blocks only the specific partition (single aggregate instance). All other partitions continue processing normally.
Example: Ticket-A's partition fails on an event. Ticket-B and Ticket-C partitions continue processing independently. Only Ticket-A is stuck.
This is a major resilience advantage — one problematic aggregate doesn't bring down the entire projection.
Streaming Projection (Enterprise)
A failure blocks whatever partition is defined on the Message Broker side (e.g., a Kafka partition). Other broker partitions continue independently.
Recovery by Execution Mode
How the system recovers from a failure depends on the execution mode:
Synchronous
The exception propagates to the caller (the command handler). There is no automatic retry — the failure is immediately visible to the user.
Asynchronous
Handled by the messaging channel's retry configuration. You can configure retry strategies with backoff:
The next poll cycle implicitly retries the failed batch. Since the position wasn't advanced, the poller will attempt the same events again.
Self-Healing Projections
A key insight: the incoming event is just a trigger. The Projection does not process the event message directly — it fetches events from the Event Stream itself, starting at its last committed position.
This is what makes projections self-healing. Consider what happens when a Projection fails because a column only accepts 10 characters, but the event contains a 13-character ticket type:
Projection fails because column is too small
If the next event (TicketWasClosed) arrives, the Projection won't skip the failed event — it will fetch from the Event Stream starting at its last known position. Once you fix the column size, the next trigger will automatically process both events:
Projection fetches from Event Stream — incoming event is just a trigger
Because the projection's position is only advanced on successful commit, fixing the bug and restarting is enough. No manual intervention needed — no resetting, no backfilling. Deploy the fix and the projection catches up automatically.
Projections are self-healing. A bug in a handler doesn't permanently corrupt the Read Model — fix the code, and the projection recovers on the next trigger. This works because events are never lost — they stay in the Event Stream, and the projection always fetches from its last committed position.
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
#[ProjectionExecution(eventLoadingBatchSize: 500)]
class TicketListProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// Processed in batches of 500
// Each batch: load → process → flush → commit
}
#[ProjectionFlush]
public function flush(): void
{
// Called after each batch, before commit
}
}
Both V1 and V2 projections read from the same underlying Event Store. The events don't change — only the projection infrastructure does. This means upgrading is purely about registering a new projection, not migrating data.
Upgrade Steps
1. Create the V2 Projection
Take your existing V1 projection and create a V2 version alongside it. The main changes are:
Replace #[Projection("name", Aggregate::class)] with #[ProjectionV2('name_v2')] + #[FromAggregateStream(Aggregate::class)]
Keep your event handlers and lifecycle hooks as they are
V1 (existing):
V2 (new):
Both projections can run side by side — they read from the same Event Store but write to different tables. There is no conflict.
2. Initialize and Backfill
Deploy the V2 projection, then initialize and backfill it:
V1 projections continue to work. You can migrate at your own pace — there is no deadline to switch.
#[Projection('ticket_list', Ticket::class)]
class TicketListProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
$this->connection->insert('ticket_list', [
'ticket_id' => $event->ticketId,
'ticket_type' => $event->type,
'status' => 'open',
]);
}
#[ProjectionInitialization]
public function init(): void { /* CREATE TABLE ticket_list */ }
}
#[ProjectionV2('ticket_list_v2')]
#[FromAggregateStream(Ticket::class)]
class TicketListV2Projection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
$this->connection->insert('ticket_list_v2', [
'ticket_id' => $event->ticketId,
'ticket_type' => $event->type,
'status' => 'open',
]);
}
#[ProjectionInitialization]
public function init(): void { /* CREATE TABLE ticket_list_v2 */ }
#[ProjectionDelete]
public function delete(): void { /* DROP TABLE ticket_list_v2 */ }
#[ProjectionReset]
public function reset(): void { /* DELETE FROM ticket_list_v2 */ }
}
$v1Tickets = $connection->fetchAllAssociative('SELECT * FROM ticket_list ORDER BY ticket_id');
$v2Tickets = $connection->fetchAllAssociative('SELECT * FROM ticket_list_v2 ORDER BY ticket_id');
assert($v1Tickets === $v2Tickets, 'V1 and V2 data should match');
The above solution requires running Console Line Commands. If we want however, we can manage all our Error Messages from one place using Ecotone Pulse.
This is especially useful when we've multiple Applications, so we can go to single place and see if any Application have failed to process Message.
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
// your retry strategy
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
// if retry strategy will not recover, then send here
"dbal_dead_letter"
);
}
#[ServiceContext]
public function dbalConfiguration()
{
return DbalConfiguration::createWithDefaults()
->withDeadLetter(false);
}
How Ecotone Solves It
Ecotone handles failures at the messaging layer — not per feature. Automatic retries, error channels, dead letter queues, the outbox pattern, and idempotency are configured once and apply to all handlers on a channel. When something fails, messages are preserved and can be replayed after the bug is fixed.
Explore the resiliency features:
Retries — Automatic retry strategies for transient failures
— Atomic message publishing with database transactions
— Optimistic and pessimistic locking
class OrderWasPlaced
{
private string $orderId;
private string $productName;
public function __construct(string $orderId, string $productName)
{
$this->orderId = $orderId;
$this->productName = $productName;
}
public function getOrderId(): string
{
return $this->orderId;
}
public function getProductName(): string
{
return $this->productName;
}
}
class NotificationService
{
const ASYNCHRONOUS_MESSAGES = "asynchronous_messages";
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
echo "Handling asynchronously: " . $event->getProductName() . "\n";
}
}
class Configuration
{
#[ServiceContext]
public function enableRabbitMQ()
{
return AmqpBackedMessageChannelBuilder::create(NotificationService::ASYNCHRONOUS_MESSAGES);
}
}
Step by step refactor from synchronous code to full resilient asynchronous code
Working with Metadata
Working with event metadata in Event Sourcing
All Events may contain additional Metadata. This is especially useful for storing information that are not required for Command to be handled, yet are important for auditing and projecting purposes.
Metadata
In Ecotone any communication happens via Messages, and Messages contains of Payload and Headers (Metadata).
So far we've discussed only the Payload part, for example ProductWasCreated Event Class is actually an Payload.
What we actually store in the Event Stream is Message, so Payload and Metadata.
Ecotone Framework use the Metadata for Framework related details, which can be used for identifying messages, correlating, and targeting (which Aggregate it's related too).
However we can also use the Metadata for additional information in our Application too.
Metadata Propagation
Ecotone provides Metadata propagation, which take cares of passing Metadata between Command and Events without the need for us to do it manually.
This way we can keep our business code clean, yet still be able to access the Metadata later.
Even so, the Metadata is not used in our Ticket Aggregate, when the Event will be stored in the Event Stream, it will be stored along side with our provided Metadata.
Therefore we will be able to access it in any Event Handlers:
Manual Propagation
We can also manually add Metadata directly from Command Handler, by packing the our data into Event class.
and then access it from any subflows:
Accessing Metadata in Command Handler
We may access metadata sent from Command Bus in Command Handler when needed:
[Enterprise] Accessing Metadata during Event Application
Pass metadata to #[EventSourcingHandler] methods for context-aware aggregate reconstruction -- without polluting event payloads with infrastructure concerns.
You'll know you need this when:
Your event-sourced aggregates serve multiple tenants and reconstruction logic varies by tenant context
Event streams are merged from multiple source systems and you need to distinguish origin during replay
You need to protect business invariants based on metadata stored alongside events (e.g., executor identity)
This feature is available as part of Ecotone Enterprise.
Resilient Sending
Resilient sending for guaranteed message delivery to async channels
Whenever we use more than one storage during single action, storing to first storage may end up with success, yet the second may not.
This can happen when we store data in database and then send Messages to Message Broker.
If failure happen it can be that we will send some Message to Broker, yet fail to store related data or vice versa.
Ecotone provide you with tools to help solve this problem in order to make sending Messages to Message Broker resilient.
Message Collector
Ecotone by default enables Message Collector. Collector collect messages that are about to be send to asynchronous channels in order to send them just before the transaction is committed. This way it help avoids bellow pitfalls:
Message Collector is enabled by default. It works whenever messages are sent via Command Bus or when message are consumed asynchronously.
Ghost Messages
Let's consider example scenario: During order processing, we publish an OrderWasPlaced event, yet we fail to store Order in the database. This means we've published Message that is based on not existing data, which of course will create inconsistency in our system.
When Message Collector is enabled it provides much higher assurance that Messages will be send to Message Broker only when your flow have been successful.
Eager Consumption
Let's consider example scenario: During order processing, we may publish an OrderWasPlaced event, yet it when we publish it right away, this Message could be consumed and handled before Order is actually committed to the database. In such situations consumer will fail due to lack of data or may produce incorrect results.
Due to Message Collector we gracefully reduce chance of this happening.
Failure on Sending next Message
In general sending Messages to external broker is composed of three stages:
Serialize Message Payload
Map and prepare Message Headers
Send Message to external Broker
In most of the frameworks those three steps are done together, which may create an issue.
Let's consider example scenario: We send multiple Messages, the first one may with success and the second fail on serialization. Due to that transaction will be rolled back, yet we already produced the first Message, which becomes an Ghost Message.
To avoid that Ecotone perform first two actions first, then collect all Messages and as a final step iterate over collected Messages and sent them. This way Ecotone ensures that all Messages must have valid serialization before we actually try to send any of them.
Disable Message Collector:
As Collector keeps the Messages in memory till the moment they are sent, in case of sending a lot of messages you may consider turning off Message Collector, to avoid memory consumption.
This way Messages will be sent instantly to your Message Broker.
Sending Retries
Whenever sending to Message Broker fails, Ecotone will retry in order to self-heal the application.
By default Ecotone will do 2 reties when sending to Message Channel fails:
- First after 10ms
- Second after 400ms.
You may configure sending retries per asynchronous channel:
Unrecoverable Sending failures
After exhausting limit of retries in order to send the Message to the Broker, we know that we won't be able to do this. In this scenario instead of letting our action fail completely, we may decide to push it to Error Channel instead of original targetted channel.
Custom handling
Dbal Dead Letter
We may decide for example to push it to Dead Letter to store it and later retry:
If you will push Error Messages to , then they will be stored in your database for later review. You may then delete or replay them after fixing the problem. This way we ensure consistency even if unrecoverable failure happened our system continues to have self-healed.
Customized configuration per Message Consumer type
If you need customization per Message Consumer you may do it using PollableChannelConfiguration by providing Message Consumer name:
Ensure full consistency
For mission critical scenarios, you may consider using .
Persistence Strategies
Event stream persistence strategies in Ecotone PHP
Persistence Strategy
Describes how streams with events will be stored.
Each Event Stream is separate Database Table, yet how those tables are created and what are constraints do they protect depends on the persistence strategy.
Simple Stream Strategy
This is the basics Stream Strategy which involves no constraints. This means that we can append any Events to it without providing any additional metadata.
Now as this is free append involves no could re-run this code apply exactly the same Event.
This can sounds silly, but it 's make it useful for particular cases.
It make it super easy to append new Events. We basically could just add this action in our code and keep applying Events to the Event Stream, we don't need to know context of what happened before.
This is useful for scenarios where we just want to store information without putting any business logic around this.
It could be used to continues stream of information like:
Temperature changes
Counting car passing by in traffic jam
Recording clicks and user views.
Partition Stream Strategy
This the default persistence strategy. It does creates partitioning within Event Stream to ensure that we always maintain correct history within partition.
This way we can be sure that each Event contains details on like Aggregate id it does relate to, on which version it was applied, to what Aggregate it references to.
The tricky part here is that we need to know Context in order to apply the Event, as besides the Aggregate Id, we need to provide Version. To know the version we need to be aware of last previous applied Event.
When this persistence strategy is used with Ecotone's Aggregate, Ecotone resolve metadata part on his own, therefore working with this Stream becomes easy. However when working directly with Event Store getting the context may involve extra work.
This Stream Strategy is great whenever business logic is involved that need to be protected.
This solves for example the problem of concurrent access on the database level, as we for example can't store Event for same Aggregate Id and Version twice in the Event Stream.
We would use it in most of business scenarios where knowing previous state in order to make the decision is needed, like:
Check if we can change Ticket based on status
Performing invocing from previous transactions
Decide if Order can be shipped
This is the default persistence strategy used whenever we don't specify otherwise.
Stream Per Aggregate Strategy
This is similar to Partition strategy, however each Partition is actually stored in separate Table, instead of Single One.
This can be used when amount of partitions is really low and volume of events within partition is huge.
Take under consideration that each aggregate instance will have separate table. When this strategy is used with a lot of Aggregate instance, the volume of tables in the database may become hard to manage.
Custom Strategy
You may provide your own Customer Persistence Strategy as long as it implements PersistenceStrategy.
Setting global Persistence Strategy
To set given persistence strategy as default, we can use ServiceContext:
Multiple Persistence Strategies
Once set, the persistence strategy will apply to all streams in your application. However, you may face a situation when you need to have a different strategy for one or more of your streams.
The above will make the Simple Stream Strategy as default however, for some_stream Event Store will use the Aggregate Stream Strategy.
Be aware that we won't be able to set Custom Strategy that way.
Event serialization and GDPR-compliant PII data handling
Event Serialization
Ecotone use in order to convert Events into serializable form.
This means we can customize process of serializing and deserializing specific Events, to adjust it to our Application.
So let's assume UserCreated Event:
If we would want to change how the Event is serialized, we would define Converter
Projections with State
PHP Event Sourcing Stateful Projections
The Problem
You need to count all tickets or calculate a running total across events, but you don't want to create a database table just for a counter. How do you keep state between event handler calls without external storage?
You need to add a column to your projection's table and change how events are processed. But rebuilding takes 30 minutes, and during that time your users see an empty dashboard. How do you deploy projection changes with zero downtime?
The features described on this page are available as part of Ecotone Enterprise.
Retries
Configuring automatic message retry strategies in Ecotone PHP
Instant Retries
Instant Retries are powerful self-healing mechanism, which helps Application to automatically recover from failures.
The are especially useful to handle temporary issues, like optimistic locking, momentary unavailability of the external service which we want to call or database connection failures. This way we can recover without affecting our end users without any effort on the Developer side.
Instant retries can be enabled for CommandBus and for Asynchronous Processing.
Ecotone allows projections to carry internal state that is automatically persisted between executions. The state is passed to each event handler and can be updated by returning a new value.
This is useful for:
Counters and aggregates (total count, running average)
Throw-away projections that calculate a result, emit an event, and then get deleted
Projections that don't need an external database table
Passing State Inside Projection
Mark a method parameter with #[ProjectionState] to receive the current state. Return the updated state from the handler:
Ecotone resolves the #[ProjectionState] parameter and passes the current state. The returned value becomes the new state for the next event handler call.
State is shared across all event handlers in the same projection — if handler A updates the state, handler B receives the updated version.
The state can be a simple array or a class. Ecotone automatically serializes and deserializes it for you.
Fetching the State from Outside
To read projection state from other parts of your application, create a Gateway interface with #[ProjectionStateGateway].
Global Projection State
For a globally tracked projection, the gateway has no parameters — there's only one state to fetch:
Ecotone automatically converts the stored state (array or serialized data) to the declared return type (TicketCounterState). If you have a converter registered, it will be used:
Gateways are automatically registered in your Dependency Container, so you can inject them like any other service.
Partitioned Projection State (Enterprise)
For a partitioned projection, each aggregate has its own state. Pass the aggregate ID as the first parameter:
Usage:
Ecotone resolves the stream name and aggregate type from the projection's configuration, then composes the partition key internally. You only need to pass the aggregate ID — the rest is handled automatically.
Multi-Stream Partitioned Projections (Enterprise)
When a partitioned projection reads from multiple streams, Ecotone needs to know which stream the aggregate ID belongs to. Use #[FromAggregateStream] on the gateway method to disambiguate:
Each method targets a specific stream — so you can fetch state for a Calendar aggregate or a Meeting aggregate from the same multi-stream projection.
#[FromAggregateStream] on the gateway method is only needed when the projection reads from multiple streams. For single-stream projections, Ecotone resolves the stream automatically.
High-Performance Projections with Flush State (Enterprise)
For projections that need to process large volumes of events quickly — during backfill or rebuild — you can combine #[ProjectionState] with #[ProjectionFlush] to build extremely performant projections.
The idea: instead of doing a database INSERT on every single event, you accumulate state in memory across the entire batch, and then persist it in one operation during flush.
With a batch size of 1000, this projection processes 1000 events without a single database write, then does one bulk persist during flush. During a rebuild over millions of events, this is dramatically faster than writing on every event.
Using #[ProjectionState] in #[ProjectionFlush] methods is available as part of Ecotone Enterprise.
Ecotone takes care of persisting and loading the state between batches automatically. You only need to focus on the accumulation logic in event handlers and the persistence logic in flush. This pattern is ideal for projections that need to rebuild quickly over large event streams.
In order to set up instant retries for Command Bus, you Service Context configuration.
This will retry your synchronous Command Handlers.
Asynchronous Instant Retries
This will retry instantly when your message is handled asynchronously. This applies to Command and Events. Take under consideration that Ecotone isolates handling asynchronous events, so it's safe to retry them.
By using instant retries for asynchronous endpoints we keep message ordering.
Command Bus Instant Retries
Create custom Command Buses with tailored retry strategies for specific business scenarios. Instead of scattering try/catch retry loops across handlers, declare retry behaviour as an attribute -- specify which exceptions to retry and how many times.
You'll know you need this when:
Database deadlocks cause intermittent command handler failures
External API calls fail transiently and a simple retry would succeed
You have try/catch retry loops scattered across your handlers
High-concurrency scenarios produce optimistic locking collisions that resolve on retry
Customized Instant Retries are available as part of Ecotone Enterprise.
Instant retries times
To set up Customized Instant Retries, we will extend CommandBus and provide the attribute
CommandBusWithRetry will be automatically registered in our Dependency Container and available for use.
Now whenever we will send an Command using this specific Command Bus, it will do two extra retries:
Instant Retries exceptions
The same way we can define specific exception list which should be retried for our customized Command Bus:
Only the exceptions defined in exception list will be retried.
Asynchronous Delayed Retries
Delayed retries are helpful in case, we can't recover instantly. This may happen for example due longer downtime of external service, which we integrate with.
In those situations we may try to self-heal the application, by delaying failed Message for given period of time. This way we can retry the call to given service after some time, and if everything is fine, then we will successfully handle the message.
Ecotone resends the Message to original channel with delay. This way we don't block processing during awaiting time, and we can continue consuming next messages. When Message will be ready (after delay time), it will be picked up from the Queue.
Installation
First Error Channel need to be set up for your Application, then you may configure retries.
Using Default Delayed Retry Strategy
If you want to use inbuilt Error Retry Strategy and set retry attempts, backoff strategy, initial delay etc, you may configure using ErrorHandlerConfiguration from ServiceContext.
Using Custom Delayed Strategy for Consumer
When we have consumer named "asynchronous_messages", then we can define PollingMetadata with customer error Channel.
#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
(...)
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
return [new PersonWasAssigned($this->ticketId, $command->personId)];
}
}
public function handle(
PersonWasAssigned $event,
// Accessing Metadata
#[Header("executorId")] $executorId
): void
{
// do something with metadata
};
#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $type;
(...)
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
return [
Event::create(
new PersonWasAssigned($this->ticketId, $command->personId),
[
'ticketType' => $this->ticketType
]
)
];
}
}
public function handle(
PersonWasAssigned $event,
// Accessing Metadata
#[Header("ticketType")] $ticketType
): void
{
// do something with metadata
};
#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $ownerId;
(...)
#[CommandHandler]
public function change(
ChangeTicket $command,
// Accessing Metadata
#[Header("executorId")] $executorId
) : array
{
// do something with executorId
}
#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $ownerId;
(...)
#[CommandHandler]
public function change(ChangeTicket $command, #[Header] $executorId) : array
{
if ($this->ownerId !== $executorId) {
throw new \InvalidArgumentException("Only owner can change Ticket");
}
return new TicketChanged($this->ticketId, $command->type);
}
#[EventSourcingHandler]
public function applyTicketCreated(
TicketCreated $event,
// Accessing Metadata
#[Header("executorId")] $executorId,
) : void
{
$this->id = $event->id;
$this->ownerId = $executorId;
}
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withCollector(false);
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::create(
RetryTemplateBuilder::exponentialBackoff(initialDelay: 10, multiplier: 2)
->maxRetryAttempts(3)
->build()
);
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("failure_channel")
}
---
#[ServiceActivator('failure_channel')]
public function doSomething(ErrorMessage $errorMessage): void
{
// Handle failure message on your own terms :)
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}
#[ServiceContext]
public function asyncChannelConfiguration()
{
return PollableChannelConfiguration::createWithDefaults('notifications')
->withCollector(false)
->withErrorChannel("dbal_dead_letter")
}
#[ServiceContext]
public function aggregateStreamStrategy()
{
return EventSourcingConfiguration::createWithDefaults()
->withCustomPersistenceStrategy(new CustomStreamStrategy(new FromProophMessageToArrayConverter()));
}
#[ServiceContext]
public function persistenceStrategy()
{
return EventSourcingConfiguration::createWithDefaults()
->withSimpleStreamPersistenceStrategy();
}
#[ServiceContext]
public function eventSourcingConfiguration(): EventSourcingConfiguration
{
return EventSourcingConfiguration::createWithDefaults()
->withPersistenceStrategyFor('some_stream', LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE)
;
}
#[ProjectionV2('ticket_counter')]
#[FromAggregateStream(Ticket::class)]
class TicketCounterProjection
{
#[EventHandler]
public function when(
TicketWasRegistered $event,
#[ProjectionState] TicketCounterState $state
): TicketCounterState {
return $state->increase();
}
}
interface TicketCounterGateway
{
#[ProjectionStateGateway(TicketCounterProjection::NAME)]
public function getCounter(): TicketCounterState;
}
#[Converter]
public function toCounterState(array $state): CounterState
{
return new CounterState(
ticketCount: $state['ticketCount'] ?? 0,
closedTicketCount: $state['closedTicketCount'] ?? 0,
);
}
interface TicketCounterGateway
{
#[ProjectionStateGateway('ticket_counter')]
public function fetchStateForPartition(string $aggregateId): CounterState;
}
$gateway = $container->get(TicketCounterGateway::class);
// Fetch state for a specific aggregate
$stateForTicket1 = $gateway->fetchStateForPartition('ticket-1');
$stateForTicket2 = $gateway->fetchStateForPartition('ticket-2');
interface CalendarCounterGateway
{
#[ProjectionStateGateway('calendar_counter')]
#[FromAggregateStream(Calendar::class)]
public function fetchCalendarState(string $aggregateId): CounterState;
#[ProjectionStateGateway('calendar_counter')]
#[FromAggregateStream(Meeting::class)]
public function fetchMeetingState(string $aggregateId): CounterState;
}
#[ProjectionV2('ticket_stats')]
#[FromAggregateStream(Ticket::class)]
#[ProjectionExecution(eventLoadingBatchSize: 1000)]
class TicketStatsProjection
{
public function __construct(private Connection $connection) {}
#[EventHandler]
public function onTicketRegistered(
TicketWasRegistered $event,
#[ProjectionState] array $state
): array {
// No database call — just update in-memory state
$type = $event->type;
$state[$type] = ($state[$type] ?? 0) + 1;
return $state;
}
#[EventHandler]
public function onTicketClosed(
TicketWasClosed $event,
#[ProjectionState] array $state
): array {
$state['closed'] = ($state['closed'] ?? 0) + 1;
return $state;
}
#[ProjectionFlush]
public function flush(#[ProjectionState] array $state): void
{
// One database operation per batch instead of per event
foreach ($state as $type => $count) {
$this->connection->executeStatement(
'INSERT INTO ticket_stats (type, count) VALUES (?, ?)
ON DUPLICATE KEY UPDATE count = ?',
[$type, $count, $count]
);
}
}
#[ProjectionInitialization]
public function init(): void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS ticket_stats (
type VARCHAR(50) PRIMARY KEY,
count INT NOT NULL DEFAULT 0
)
SQL);
}
}
#[ServiceContext]
public function registerRetries()
{
return InstantRetryConfiguration::createWithDefaults()
->withCommandBusRetry(
isEnabled: true,
retryTimes: 3, // max retries
retryExceptions: [DatabaseConnectionFailure::class, OptimisticLockingException::class] // list of exceptions to be retried, leave empty if all should be retried
)
}
#[ServiceContext]
public function registerRetries()
{
return InstantRetryConfiguration::createWithDefaults()
->withAsynchronousEndpointsRetry(
isEnabled: true,
retryTimes: 3, // max retries
retryExceptions: [DatabaseConnectionFailure::class, OptimisticLockingException::class] // list of exceptions to be retried, leave empty if all should be retried
)
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "notifyAboutNewOrder")]
public function notifyAboutNewOrder(OrderWasPlaced $event, NotificationService $notificationService) : void
{
$notificationService->notifyAboutNewOrder($event->getOrderId());
}
#[ServiceContext]
public function errorConfiguration()
{
return PollingMetadata::create("asynchronous_messages")
->setErrorChannel("customErrorChannel");
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"customErrorChannel",
RetryTemplateBuilder::exponentialBackoff(100, 3)
->maxRetryAttempts(2)
);
}
Then the Event Stream would look like above
User Event Stream with custom serialization
This basically means we can serialize the Event in the any format we want.
Having customized Converters for specific Events, is also useful when we need to adjust some legacy Events to new format. We can hook into the deserialization process, and modify the payload to match new structure.
Advanced Serialization Support with JMS
When using JMS Converter support, we can even customize how we want to serialize given class, that is used within Events.
For example we could have User Created Event which make use of UserName class.
the UserName would be a simple Class which contains of validation so the name is not empty:
Now if we would serialize it without telling JMS, how to handle this class we would end up with following JSON in the Event Stream:
Now this is fine for short-lived applications and testing, however in the long living application this may become a problem. The problem may come from changes, if we would simply change property name in UserName.value to UserName.data it would break deserialization of our previous Events.
As data does not exists under name key.
Therefore we want to keep take over the serialization of objects, to ensure stability along the time.
Now with above Converter, whenever we will use UserName class, we will be actually serializing it to simple string type, and then when deserialize back from simple type to UserName class:
With this, with few lines of code we can ensure consistency across different Events, and keeping our Events bullet proof for code refactor and changes.
PII Data (GDPR)
In case of storing sensitive data, we may be forced by law to ensure that data should be forgotten (e.g. GDPR). This basically means, if Customer will ask to us to remove his data, we will be obligated by law to ensure that this will happen.
However in case of Event Sourced System we rather do not want to delete events, as this is critical operation which is considered dangerous. Deleting Events could affect running Projections, deleting too much may raise inconsistencies in the System, and in some cases we may actually want to drop only part of the data - not everything.
Therefore dropping Events from Event Stream is not suitable solution and we need something different.
Solution that we can use, is to change the way we serialize the Event. We can hook into serialization process just as we did for normal serialization, and then customize the process.
Converter in reality is an Service registered in Dependency Container, so we may inject anything we want there in order to modify the serialization process.
So let's assume that we want to encrypt UserCreated Event:
So what we do here, is we hook into serialization/deserialization process and pass the data to EncryptionService. As you can see here, we don't store the payload here, we simply store an reference in form o a key.
EncryptionService can as simple as storing this data in database table using key as Primary Key, so we can fetch it easily. It can also be stored with encryption in some cryptographic service, yet it may also be stored as plain text. It all depends on our Domain.
However what is important is that we've provided the resource id to the EncryptionService
Now this could be used to delete related Event's data.
When Customer comes to us and say, he wants his data deleted, we simply delete by resource:
That way this Data won't be available in the System anymore.
Now we could just allow Converters fails, if those Events are meant to be deserialized, or we could check if given key exists and then return dummy data instead.
If we allow Converters to fail when Serialization happens, we should ensure that related Projections are using simple arrays instead of classes, and handle those cases during Projecting.
If we decide to return dummy data, we can keep deserializing those Events for Projections, as they will be able to use them.
final readonly class UserCreated
{
public function __construct(
public string $userId,
public string $name,
public string $surname,
)
{
}
}
final readonly class UserCreatedConverter
{
#[Converter]
public function toArray(UserCreated $event): array
{
return [
'userId' => $event->userId,
'userName' => $event->name,
'userSurname' => $event->surname,
];
}
#[Converter]
public function fromArray(array $event): UserCreated
{
return new UserCreated(
$event['userId'],
$event['userName'],
$event['userSurname'],
);
}
}
final readonly class UserCreated
{
public function __construct(
public string $userId,
public UserName $name,
public string $surname,
)
{
}
}
final readonly class UserName
{
public function __construct(
public string $value,
)
{
if ($value === "") {
throw new \InvalidArgumentException("Name should not be empty");
}
}
}
class UserNameConverter
{
#[Converter]
public function from(UserName $data): string
{
return $data->value;
}
#[Converter]
public function to(string $data): UserName
{
return new UserName($data);
}
}
Instead of rebuilding the existing projection in-place (which clears the data), deploy a new version alongside the old one:
v1 continues serving traffic normally
v2 is deployed and catches up from historical events in the background
Once v2 is fully caught up, switch traffic from v1 to v2
Delete v1
Both projections run against the same Event Store — no data migration or copying needed.
Using #[ProjectionName] for Versioned Tables
The key mechanism is #[ProjectionName] — it injects the projection name as a parameter into your handlers. Use it to dynamically name your tables, so tickets_v1 and tickets_v2 coexist in the same database:
Because the table name comes from the projection name, deploying tickets_v2 creates a completely separate table — no conflicts with tickets_v1.
Deploying Version 2
When you need to deploy changes, create v2 with #[ProjectionDeployment]:
Two settings control the deployment:
manualKickOff: true — the projection won't auto-initialize. You control when it starts.
live: false — events emitted via EventStreamEmitter are suppressed during the catch-up phase. This prevents duplicate notifications to downstream consumers.
Step-by-Step Deployment Flow
1. Deploy v2
Deploy your code with the tickets_v2 projection class. Because manualKickOff: true, nothing happens yet.
During this phase, v1 continues serving traffic normally. v2 processes historical events in the background.
4. Verify v2
Check that v2's data looks correct — query the tickets_v2 table and compare with tickets_v1.
5. Switch Traffic
Update your application's query handlers to read from tickets_v2 instead of tickets_v1.
6. Enable Live Mode
Update the v2 projection to remove manualKickOff and set live: true, so it processes new events and emits downstream events normally.
7. Delete v1
bin/console ecotone:projection:delete tickets_v1
artisan ecotone:projection:delete tickets_v1
This calls #[ProjectionDelete] which drops the tickets_v1 table.
Event Emission Control
The live: false setting is critical for projections that emit events. Without it, the backfill phase would re-emit all historical events — sending thousands of duplicate notifications to downstream consumers.
With live: false:
Events emitted during backfill are silently discarded
Once you switch to live: true, new events are emitted normally
Downstream consumers only see events once
Upgrading from Global to Partitioned
Blue-green deployments also work for changing projection types. You can deploy v2 as a Partitioned Projection alongside your existing global v1:
The same Event Store backs both projections. The only difference is how events are tracked and processed — v2 uses per-aggregate partitions instead of a single global position. Once v2 catches up, switch traffic and delete v1.
#[ProjectionV2('tickets_v1')]
#[FromAggregateStream(Ticket::class)]
class TicketsProjection
{
public function __construct(private Connection $connection) {}
#[ProjectionInitialization]
public function init(#[ProjectionName] string $projectionName): void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS {$projectionName} (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25),
status VARCHAR(25)
)
SQL);
}
#[EventHandler]
public function onTicketRegistered(
TicketWasRegistered $event,
#[ProjectionName] string $projectionName
): void {
$this->connection->insert($projectionName, [
'ticket_id' => $event->ticketId,
'ticket_type' => $event->type,
'status' => 'open',
]);
}
#[EventHandler]
public function onTicketClosed(
TicketWasClosed $event,
#[ProjectionName] string $projectionName
): void {
$this->connection->update(
$projectionName,
['status' => 'closed'],
['ticket_id' => $event->ticketId]
);
}
#[ProjectionDelete]
public function delete(#[ProjectionName] string $projectionName): void
{
$this->connection->executeStatement("DROP TABLE IF EXISTS {$projectionName}");
}
#[ProjectionReset]
public function reset(#[ProjectionName] string $projectionName): void
{
$this->connection->executeStatement("DELETE FROM {$projectionName}");
}
#[QueryHandler('getTickets')]
public function getTickets(#[ProjectionName] string $projectionName): array
{
return $this->connection->fetchAllAssociative(
"SELECT * FROM {$projectionName}"
);
}
}
#[ProjectionV2('tickets_v2')]
#[FromAggregateStream(Ticket::class)]
#[ProjectionDeployment(manualKickOff: true, live: false)]
class TicketsV2Projection extends TicketsProjection
{
// Same handlers — or modified handlers with your schema changes
// The table name will be 'tickets_v2' thanks to #[ProjectionName]
}
#[ProjectionV2('tickets_v2')]
#[FromAggregateStream(Ticket::class)]
#[Partitioned]
#[ProjectionDeployment(manualKickOff: true, live: false)]
class TicketsV2Projection extends TicketsProjection
{
// Same handlers, now partitioned
}
About
Ecotone — The enterprise architecture layer for Laravel and Symfony
Ecotone extends your existing Laravel and Symfony application with the enterprise architecture layer
One Composer package adds CQRS, Event Sourcing, Workflows, and production resilience to your codebase. No framework change. No base classes. Just PHP attributes on your existing code.
composer require ecotone/laravel # or ecotone/symfony-bundle
See what it looks like
That's the entire setup. No bus configuration. No handler registration. No retry config. No serialization wiring. Ecotone reads your attributes and handles the rest:
Command and Query Bus — wired automatically from your #[CommandHandler] and #[QueryHandler] attributes
Event routing — NotificationService subscribes to OrderWasPlaced without any manual wiring
Test exactly the flow you care about
Extract a specific flow and test it in isolation — only the services you need:
Only OrderService is loaded. No notifications, no other handlers — just the flow you're verifying.
Now bring in the full async flow. Enable an in-memory channel and run it within the same test process:
->run('notifications') processes messages from the in-memory queue — right in the same process. The async handler executes deterministically, no timing issues, no polling, no external broker.
The key: swap the in-memory channel for , , or to test what runs in production — the test stays the same. Ecotone runs the consumer within the same process, so switching transports never changes how you test. The ease of in-memory testing no matter what backs your production system.
What changes in your daily work
Business logic is the only code you write
No command bus configuration. No handler registration. No message serialization setup. You write a PHP class with an attribute, and Ecotone wires the bus, the routing, the serialization, and the async transport. Your code stays focused on what your application actually does — your domain.
Going async never means rewriting handlers
Add #[Asynchronous('channel')] to any handler. The handler code stays identical. Switch from synchronous to to to by changing one line of configuration. Your business logic never knows the difference.
Failed messages don't disappear
Every failed message is captured in a . You see what failed, the full exception, and the original message. with one command. And can be combined with inbuilt Outbox pattern to ensure full consistency. No more silent failures. No more guessing what happened to that order at 3am.
Complex workflows live in one place
A multi-step business process — order placement, payment, shipping, notification — doesn't need to be scattered across event listeners, cron jobs, and database flags. Ecotone gives you for stateful workflows, for linear pipelines, and for declarative process control. The entire business flow is readable in one class.
Your codebase tells the story of your business
When a new developer opens your code, they see PlaceOrder, OrderWasPlaced, ShipOrder — not AbstractMessageBusHandlerFactory. Ecotone keeps your domain clean: no base classes to extend, no framework interfaces to implement, no infrastructure leaking into your business logic. Just with attributes that declare their intent.
AI-ready by design
Ecotone's declarative, attribute-based architecture is inherently friendly to AI code generators. When your AI assistant works with Ecotone code, two things happen:
Less context needed, less code generated. A command handler with #[CommandHandler] and #[Asynchronous('orders')] tells the full story in two attributes — no bus configuration files, no handler registration, no retry setup to feed into the AI's context window. The input is smaller because there's less infrastructure to read, and the output is smaller because there's less boilerplate to generate. That means lower token cost, faster iteration cycles, and more accurate results.
AI that knows Ecotone. Your AI assistant can work with Ecotone out of the box:
— Ready-to-use skills that teach any coding agent how to correctly write handlers, aggregates, sagas, projections, tests, and more. Install with one command and your AI generates idiomatic Ecotone code from the start.
— Direct access to Ecotone documentation for any AI assistant that supports Model Context Protocol — Claude Code, Cursor, Windsurf, GitHub Copilot, and others.
— AI-optimized documentation files that give any LLM instant context about Ecotone's API and patterns.
Testing that AI can actually run. Ecotone's runs async flows in the same process — even complex workflows with sagas and projections can be tested with ->sendCommand() and ->run(). Your coding agent writes and verifies tests without needing to set up external infrastructure or guess at test utilities.
Declarative configuration that any coding agent can follow and reproduce. Testing support that lets it verify even the most advanced flows. Less guessing, no hallucinating — just confident iteration.
The full capability set
Capability
What it gives you
Learn more
The enterprise gap in PHP, closed
Every mature ecosystem has an enterprise architecture layer on top of its web framework:
Ecosystem
Web Framework
Enterprise Architecture Layer
Ecotone is built on the same foundation — — that powers Spring Integration, NServiceBus, and Apache Camel. In active development since 2017 and used in production by teams running multi-tenant, event-sourced systems at scale, Ecotone brings the same patterns that run banking, logistics, and telecom systems in Java and .NET to PHP.
This isn't about PHP catching up. It's about your team using proven architecture patterns — with the development speed that PHP gives you — without giving up the ecosystem you already know.
Start with your framework
Laravel — Laravel's queue runs jobs, not business processes. Stop stitching Spatie + Laravel Workflow + Bus::chain + DIY outbox. Ecotone replaces the patchwork with one attribute-driven toolkit: aggregates with auto-published events, piped workflows, sagas, snapshots, transactional outbox — testable in-process, running on the queues you already have.
composer require ecotone/laravel
→ ·
Symfony — Symfony Messenger handles dispatch. For aggregates, sagas, or event sourcing the usual path is bolting on a separate event sourcing library, rolling your own outbox, and writing dedup middleware per handler. Ecotone replaces the patchwork with one attribute-driven toolkit: aggregates, sagas, event sourcing, piped workflows, transactional outbox, and per-handler failure isolation so one failing listener doesn't double-charge customers on retry. Pure POPOs, Bundle auto-config, your Messenger transports preserved.
composer require ecotone/symfony-bundle
→ ·
Any PHP framework — Ecotone Lite works with any PSR-11 compatible container.
composer require ecotone/lite-application
→
Try it in one handler. You don't need to migrate your application. Install Ecotone, add an attribute to one handler, and see what happens. If you like what you see, add more. If you don't — remove the package. Zero commitment.
— Setup guide for any framework
— Send your first command in 5 minutes
— Build a complete messaging flow step by step
The full CQRS, Event Sourcing, and Workflow feature set is under the Apache 2.0 License. are available for teams that need advanced scaling, distributed bus with service map, orchestrators, and production-grade Kafka integration.
Join — ask questions and share what you're building.
New Aggregates are initialized using public factory method (static method).
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public static function createTicket(CreateTicket $command): static
{
$ticket = new self();
$ticket->id = Uuid::generate();
$ticket->assignedTo = $command->assignedTo;
$ticket->description = $command->description;
return $ticket;
}
}
After calling createTicket aggregate will be automatically stored.
Factory method is static method in the Aggregate class.
You may have multiple factory methods if needed.
Sending Command looks exactly the same like in scenario.
When factory method is called from Command Bus, then Ecotone will return new assigned identifier.
Aggregate Action Method
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading and saving the aggregate after calling action method.
ChangeTicket should contain the identifier of Aggregate instance on which action method should be called.
And then we call it from Command Bus:
Calling Aggregate without Command Class
In fact we don't need to provide identifier in our Commands in order to execute specific Aggregate instance. We may not need a Command class in specific scenarios at all.
In this scenario, if we would add Command Class, it would only contain of the identifier and that would be unnecessary boilerplate code. To solve this we may use in order to provide information about instance of the aggregate we want to call.
"aggregate.id" is special metadata that provides information which aggregate we want to call.
When we avoid creating Command Classes with identifiers only, we decrease amount of boilerplate code that we need to maintain.
Redirected Aggregate Creation
There may be a cases where you would like to do conditional logic, if aggregate exists do thing, otherwise this. This may be useful to keep our higher level code clean of "if" statements and to simply API by exposing single method.
Both Command Handlers are registered for same command CreateTicket, yet one method is factory method and the second is action method.
When Command will be sent, Ecotone will try to load the aggregate first,
if it will be found then changeTicket method will be called, otherwise createTicket.
Redirected aggregate creation works the same for Event Sourced Aggregates.
Publishing Events from Aggregate
For standard Aggregates (non Event-Sourced) we can use WithEvents trait or provide method that with AggregateEvents attribute to provide list of Events that Aggregate has recorded.
After saving changes to the Aggregate, Ecotone will automatically publish related Events
Calling Aggregate with additional arguments
Just as standard Command Handler, we can pass Metadata and DI Services to our Aggregates.
Converting Parameters
Converting parameters in Database Business Interface queries
We may want to use higher level object within our Interface than simple scalar types. As those can't be understood by our Database, it means we need Conversion. Ecotone provides default conversions and possibility to customize the process.
Default Date Time Conversion
Ecotone provides inbuilt Conversion for Date Time based objects.
#[DbalWrite('INSERT INTO activities VALUES (:personId, :time)')]
public function add(string $personId, \DateTimeImmutable $time): void;
By default Ecotone will convert time using Y-m-d H:i:s.u format. We may override this using .
Default Class Conversion
If your Class contains __toString method, it will be used for doing conversion.
We may override this using .
Converting Array to JSON
For example database column may be of type JSON or Binary.
In those situation we may state what Media Type given parameter should be converted too, and Ecotone will do the conversion before it's executing SQL.
In above example roles will be converted to JSON before SQL will be executed.
Value Objects Conversion
If we are using higher level classes like Value Objects, we will be able to change the type to expected one.
For example if we are using we can register Converter for our PersonRole Class and convert it to JSON or XML.
Read more about Ecotone's
Then we will be able to use our Business Method with PersonRole, which will be converted to given Media Type before being saved:
This way we can provide higher level classes, keeping our Interface as close as it's needed to our business model.
Using Expression Language
Calling Method Directly on passed Object
We may use Expression Language to dynamically evaluate our parameter.
payload is special parameter in expression, which targets value of given parameter, in this example it will be PersonName.
In above example before storing name in database, we will call toLowerCase() method on it.
Using External Service for evaluation
We may also access any Service from our Dependency Container and run a method on it.
reference is special function within expression which allows us to fetch given Service from Dependency Container. In our case we've fetched Service registered under "converter" id and ran normalize method passing PersonName.
Using Method Level Dbal Parameters
We may use Dbal Parameters on the Method Level, when parameter is not needed.
Static Values
In case parameter is a static value.
Dynamic Values
We can also use dynamically evaluated parameters and access Dependency Container to get specific Service.
Dynamic Values using Parameters
In case of Method and Class Level Dbal Parameters we get access to passed parameters inside our expression. They can be accessed via method parameters names.
Using Class Level Dbal Parameters
As we can use method level, we can also use class level Dbal Parameters. In case of Class level parameters, they will be applied to all the method within interface.
Using Expression language in SQL
To make our SQLs more readable we can also use the expression language directly in SQLs.
Suppose we Pagination class
then we could use it like follows:
To enable expression for given parameter, we need to follow structure :(expression), so to use limit property from Pagination class we will write :(pagination.limit)
Working with Event Streams
Working with Event Streams in Ecotone PHP
In previous chapter we discussed that Event Sourcing Aggregates are built from Event Streams stored in the data store. Yet it's important to understand how those Events gets to the Event Stream in the first place.
Working with Event Stream directly
Let's start by manually appending Events using Event Store. This will help us understand better the concepts behind the Event Stream and Event Partitioning. After we will understand this part, we will introduce Event Sourcing Aggregates, which will abstract away most of the logic that we will need to do in this chapter.
Working with Event Stream directly may be useful when migrating from existing system where we already had an Event Sourcing solution, which we want to refactor to Ecotone.
Creating new Event Stream
After installing Ecotone's Event Sourcing we automatically get access to Event Store abstraction.
This abstraction provides an easy to work with Event Streams.
Let's suppose that we do have Ticketing System like Jira with two basic Events "Ticket Was Registered" and "Ticket Was Closed".
Of course we need to identify to which Ticket given event is related, therefore will have some Id.
In our code we can define classes for those:
To store those in the Event Stream, let's first declare it using - Event Store abstraction.
Event Store is automatically available in your Dependency Container after installing Symfony or Laravel integration. In case of Ecotone Lite, it can be retrievied directly.
Event Store provides few handy methods:
As we want to append some Events, let's first create an new Event Stream
This is basically enough to create new Event Stream.
But it's good to understand what actually happens under the hood.
What is the Event Stream actually
In short Event Stream is just audit of series of Events. From the technical point it's a table in the Database. Therefore when we create an Event Stream we are actually creating new table.
Event Stream table contains:
Event Id - which is unique identifier for Event
Event Name - Is the named of stored Event, which is to know to which Class it should be deserialized to
Payload - is actual Event Class, which is serialized and stored in the database as JSON
Appending Events to Event Stream
To append Events to the Event Stream we will use "appendTo" method
This will store given Event in Ticket's Event Stream
Above we've stored Events for Ticket with id "123". However we can store Events from different Tickets in the same Event Stream.
We now can load those Events from the Event Stream
This will return iterator of Ecotone's Events
As we can see this maps to what we've been storing in the Event Stream table.
Payload will contains our deserialized form of our event, so for example TicketWasRegistered.
We could also fetch list of Events without deserializing them.
$events = $eventStore->load("ticket", deserialize: false);
In that situations payload will contains an associative array.
This may be useful when iterating over huge Event Streams, when there is no need to actually work with Objects. Besides that ability to load in batches may also be handy.
Concurrent Access
Let's consider what may actually happen during concurrent access to our System. This may be due more people working on same Ticket or simply because our system did allow for double clicking of the same action.
In those situations we may end up storing the same Event twice
Without any protection we will end up with Closing Events in the Event Stream.
That's not really ideal, as we will end up with Event Stream having incorrect history:
This is the place where we need to get back to persistence strategy:
We've created this Stream with "simple" persistence strategy. This means we can apply any new Events without guards. This is fine in scenarios where we are dealing with no business logic involved like collecting metrics, statistics. where all we to do is to push push Events into the Event Stream, and duplicates are not really a problem.
However simple strategy (which is often the only strategy in different Event Sourcing Frameworks), comes with cost:
We lose linear history of our Event Stream, as we allow for storing duplicates. This may lead to situations which may lead to incorrect state of the System, like Repayments being recorded twice.
As a result of duplicated Events (Which hold different Message Id) we will trigger side effects twice. Therefore our Event Handlers will need to handle this situation to avoid for example trigger requests to external system twice, or building wrong Read Model using .
As we do allow for concurrent access, we can actually make wrong business decisions. For example we could give to the Customer promotion code twice.
The "simple strategy" is often the only strategy that different Event Sourcing Frameworks provide. However after the solution is released to the production, we often start to recognize above problems, yet now as we don't have other way of dealing with those, we are on mercy of fixing the causes, not the root of the problem.
Therefore we need more sophisticated solution to this problem, to solve the cause of it not the side effects. And to solve the cause we will be using different persistence strategy called "partition strategy".
Partitioning Events
Event Stream can be split in partitions. Partition is just an sub-stream of Events related to given Identifier, in our context related to Ticket.
Partition is linear history for given identifier, where each Event is within partition is assigned with version. This way we now, which event is at which position.
Therefore in order to partition the Stream, we need to know the partition key (in our case Ticket Id). By knowing the partition key and last version of given partition, we can apply an Event at the correct position.
To create partitioned stream, we would create Event Stream with different strategy:
This will create Event Stream table with constraints, which will require:
Aggregate Id - This will be our partition key
Aggregate Type - This may be used if we would store more Aggregate types within same Stream (e.g. User), as additional partition key
Aggregate Version - This will ensure that we won't apply two Events at the same time to given partition
We append those as part of Event's metadata:
Let's now see, how does it help us ensuring that our history is always correct. Let's assume that currently we do have single Event in the partition
Now let's assume two requests happening at the same time:
This way allows us to be sure that within request we are dealing with latest Event Stream, because if that's not true we will end up in concurent exception. This kind of protection is crucial when dealing with business logic that depends on the previous events, as it ensures that there is no way to bypass it.
Idempotency (Deduplication)
Idempotent consumer pattern for message deduplication in PHP
The role of deduplication is to safely receive same message multiple times, as there is no guarantee from Message Brokers that we will receive the same Message once.
In Ecotone all Messages are identifiable and contains of Message Id. Message Id is used for deduplication by default, when Dbal Module is installed. If message was already handled, it will be skipped.
Deduplication is enabled by default and works whenever message is consumed in .
Custom Deduplication
You may also define given Message Handler for deduplication. This will use and deduplicated base on your customer header key.
This allows in synchronous and asynchronous scenarios.
This is especially useful when, we receive events from external services e.g. payment or notification events which contains of identifier that we may use deduplication on.
For example Sendgrid (Email Service) sending us notifications about user interaction, as there is no guarantee that we will receive same webhook once, we may use "eventId", to deduplicate in case.
paymentId becomes our deduplication key. Whenever we will receive now Command with same value under paymentId header, Ecotone will deduplicate that and skip execution of receivePayment method.
We pass endpointId to the Command Handler to indicate that deduplication should happen within Command Handler with this endpoint id.
If we would not pass that, then endpointId will be generated and cached automatically. This means deduplication for given Command Handler would be valid as long as we would not clear cache.
Custom Deduplication across Handlers
Deduplication happen across given endpointId.This means that if we would introduce another handler with same deduplication key, it will get it's own deduplication tracking.
As deduplication is tracked within given endpoint id, it means we can change the deduplication key safely without being in risk of receiving duplicates. If we would like to start tracking from fresh, it would be enough to change the endpointId.
Deduplication with Expression language
We can also dynamically resolve deduplicate value, for this we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate mapping:
Deduplication with Command Bus
Deduplicate messages at the Command Bus level to protect every handler behind that bus automatically -- without per-handler deduplication code.
You'll know you need this when:
Users double-click submit buttons and create duplicate orders or payments
Webhook providers retry delivery and your handlers process the same event twice
Message replay during recovery causes duplicate processing
To reuse same deduplication mechanism across different Message Handlers, extend Command Bus interface with your custom one:
Then all Commands sent over this Command Bus will be deduplicated using "paymentId" header.
This feature is available as part of Ecotone Enterprise.
Command Bus name
By default using same deduplication key between Command Buses, will mean that Message will be discarded. If we want to ensure isolation that each Command Bus is tracking his deduplication separately, we can add tracking name:
Deduplication clean up
To remove expired deduplication history which is kept in database table, Ecotone provides an console command:
This command can be configured to run periodically e.g. using cron jobs.
By default Ecotone removes message id from deduplication storage after 7 days in batches of 1000.
It can be customized in case of need:
It's important to keep removal batch size at small number. As deleting records may result in database index rebuild which will cause locking. Therefore small batch size will ensure our system can continue, while messages are being deleted in background.
Disable Deduplication
As the deduplication is enabled by default, if you want to disable it then make use of DbalConfiguration.
Message Handling Isolation
Message handling isolation for safe retries without side effects
It's good to know how Ecotone solves the problem of Message Handling Isolation, which is one of the key features that allows us to build Resilient Messaging Systems.
Sending an Event Message
In Message-Based Systems we will have situation that as a result of given Event, we will want to trigger some actions. This can sending notification, but also calling an external Service or starting fully new separate flow etc.
However those actions may actually fail for various of reasons and depending on how Messaging is implemented, it may help us to recover from this safely, or trigger unexpected side effects that may harm the business.
Common Event Bus Implementation
Let's first consider typical implementation of Message Bus, where we send an Event Message which is consumed by more than one Message Handler (Subscriber / Event Handler).
After placing an Order we send Asynchronous Order Was Placed Event Message, which as as a result triggers all related Event Handlers. As we can easily imagine, one of those Event Handlers may fail. However this creates a problem, because it's not only the Event Handler that have failed will be retried, but all the Event Handlers connected to given Event Message.
This of course may produce unexpected side effects, like sending confirmation twice, or delivering goods to the Customers more than once. Idempotency may help here, but it's not always available or implemented correctly, therefore we may try to solve it on higher level code.
To solve it using higher level code we may introduce multiple Messages Queues having single Message Handler connected, or produce Command Messages from Event Handlers in order to provide isolation. However all of those solutions make infrastructure, configuration or application level code more complex. This is because we try to solve Message Handling Isolation in upper levels, instead of having it solved on the foundation level.
Ecotone's Event Bus Implementation
Ecotone solves Message Handling Isolation at the foundation level, by delivering a copy of a Message to each of the related Event Handler separately:
Whenever Event Message is sent, a copy of this Message will be delivered to each of the related Event Handlers. This as a result make each Handler consume the Message in complete isolation and enables safe retries.
Handling each Event Handler in complete isolation, creates environment where safe retries are possible, as only Event Handler that have failed will be retried. By solving this on the foundation, the higher level code can stay focused on business part of the system, not solving Message Handling Isolation problems.
There are of course more benefits that this solution enables:
Possibility to safely retry
instead of whole Message
instead of whole Message
Safe Retries
Ecotone's implementation enables safe retries, thanks to the processing isolation it provides.
Let's consider asynchronous scenario, where we want send order confirmation and reserve products in Stock via HTTP call, when Order Was Placed. This could potentially look like this:
Now imagine that sending to Stock fails and we want to retry. If we would retry whole Event, we would retry "notifyAboutNewOrder" method, this would lead to sending an notification twice. It's easy to imagine scenarios where this could lead to even worse situations, where side effect could lead to double booking, trigger an second payment etc.
In Ecotone this does not happen, as each of the Handlers would receive it's own copy of the Message and proceed in isolation.
Sending a copy to each of the Handlers
In Ecotone each of the Handlers will receive it's own copy of the Event and will handle it in full isolation.
This means that under the hood, there would be two messages sent to asynchronous_messages
each targeting specific Event Handler.
This bring safety to retrying events, as in case of failure, we will only retry the Handler that actually failed.
In Ecotone it's the Handler that becomes Asynchronous (not Event itself) you may customize the behaviour to your needs.
If you want, you may:
Run one Event Handler synchronously and the other asynchronously.
You may decide to use different Message Channels for each of the Asynchronous Event Handlers.
Error channels and dead letter queues for failed message handling
Error Channel
Ecotone comes with solution called Error Channel.
Error Channel is a place where unrecoverable Errors can go, this way we can preserve Error Messages even if we can't handle them anyhow at given moment.
Error Channel may log those Messages, store them in database, push them to some Asynchronous Channel, it all depends on what Handler we will connect to the Error Channel.
Error Channel Flow
On the high level Error Channel works as follows:
Execution Modes
PHP Event Sourcing Projection Execution Modes
The Problem
Your projection runs in the same request as the command handler, and under heavy load it slows down your API. Or you have multiple projections and don't want one slow projection to block others. How do you control when and where projections execute, and what consistency trade-offs come with each choice?
Execution modes determine where your projection runs (same process or background worker) and when it processes events (immediately or later). Each mode comes with different consistency guarantees.
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public static function createTicket(CreateTicket $command): static
{
$ticket = new self();
$ticket->id = Uuid::generate();
$ticket->assignedTo = $command->assignedTo;
$ticket->description = $command->description;
return $ticket;
}
#[CommandHandler]
public function changeTicket(CreateTicket $command): void
{
$this->description = $command->description;
$this->assignedTo = $command->assignedTo;
}
}
#[Aggregate]
class Ticket
{
use WithEvents; // Provides methods for collecting events
#[Identifier]
private Uuid $ticketId;
#[CommandHandler]
public static function createTicket(
CreateTicket $command
): static
{
$self = new self($command->id);
$self->recordThat(new TicketWasCreated($command->id));
return $self;
}
}
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
#[CommandHandler]
public static function createTicket(
CreateTicket $command,
#[Header("executorId")] string $executorId,
#[Reference] Clock $clock,
): static
{
return new self(
$command->id,
$executorId,
$clock->currentTime(),
);
}
}
#[DbalWrite('INSERT INTO activities VALUES (:personId, :time)')]
public function store(PersonId $personId, \DateTimeImmutable $time): void;
final readonly class PersonId
{
public function __construct(private string $id) {}
public function __toString(): string
{
return $this->id;
}
}
/**
* @param string[] $roles
*/
#[DbalWrite('UPDATE persons SET roles = :roles WHERE person_id = :personId')]
public function changeRoles(
int $personId,
#[DbalParameter(convertToMediaType: MediaType::APPLICATION_JSON)] array $roles
): void;
final class PersonRoleConverter
{
#[Converter]
public function from(PersonRole $personRole): string
{
return $personRole->getRole();
}
#[Converter]
public function to(string $role): PersonRole
{
return new PersonRole($role);
}
}
/**
* @param PersonRole[] $roles
*/
#[DbalWrite('UPDATE persons SET roles = :roles WHERE person_id = :personId')]
public function changeRolesWithValueObjects(
int $personId,
#[DbalParameter(convertToMediaType: MediaType::APPLICATION_JSON)] array $roles
): void;
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function register(
int $personId,
#[DbalParameter(expression: 'payload.toLowerCase()')] PersonName $name
): void;
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function insertWithServiceExpression(
int $personId,
#[DbalParameter(expression: "reference('converter').normalize(payload)")] PersonName $name
): void;
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :roles)')]
#[DbalParameter(name: 'roles', expression: "['ROLE_ADMIN']", convertToMediaType: MediaType::APPLICATION_JSON)]
public function registerAdmin(int $personId, string $name): void;
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :registeredAt)')]
#[DbalParameter(name: 'registeredAt', expression: "reference('clock').now()")]
public function registerAdmin(int $personId, string $name): void;
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :roles)')]
#[DbalParameter(name: 'roles', expression: "name === 'Admin' ? ['ROLE_ADMIN'] : []", convertToMediaType: MediaType::APPLICATION_JSON)]
public function registerUsingMethodParameters(int $personId, string $name): void;
#[DbalParameter(name: 'registeredAt', expression: "reference('clock').now()")]
class AdminAPI
{
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :registeredAt)')]
public function registerAdmin(int $personId, string $name): void;
}
final readonly class Pagination
{
public function __construct(public int $limit, public int $offset)
{
}
}
interface PersonService
{
#[DbalQuery('
SELECT person_id, name FROM persons
LIMIT :(pagination.limit) OFFSET :(pagination.offset)'
)]
public function getNameListWithIgnoredParameters(
Pagination $pagination
): array;
}
final readonly class TicketWasRegistered
{
public function __construct(
public string $id,
public string $type
) {}
}
final readonly class TicketWasClosed
{
public function __construct(
public string $id,
) {}
}
interface EventStore
{
/**
* Creates new Stream with Metadata and appends events to it
*
* @param Event[]|object[] $streamEvents
*/
public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void;
/**
* Appends events to existing Stream, or creates one and then appends events if it does not exists
*
* @param Event[]|object[] $streamEvents
*/
public function appendTo(string $streamName, array $streamEvents): void;
/**
* @return Event[]
*/
public function load(
string $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null,
bool $deserialize = true
): iterable;
}
$eventStore->create("ticket", streamMetadata: [
"_persistence" => 'simple', // we will get back to that in later part of the section
]);
$eventStore->appendTo(
"ticket",
[
new TicketWasRegistered('123', 'critical'),
new TicketWasClosed('123')
]
);
$eventStore->appendTo(
"ticket",
[
new TicketWasRegistered('124', 'critical'),
]
);
$events = $eventStore->load("ticket");
class Event
{
private function __construct(
private string $eventName,
private object|array $payload,
private array $metadata
)
(...)
final class PaymentHandler
{
#[Deduplicated('paymentId')]
#[CommandHandler(endpointId: "receivePaymentEndpoint")]
public function receivePayment(ReceivePayment $command): void
{
// handle
}
}
final class PaymentHandler
{
#[Deduplicated('paymentId')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}
final class PaymentHandler
{
#[Deduplicated(expression: 'payload.paymentId')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}
final class PaymentHandler
{
#[Deduplicated(expression: 'reference("paymentIdMapper").map(payload.paymentId)')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}
class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
// 100000 ms - 100 seconds
->withDeduplication(
expirationTime: 100000,
removalBatchSize: 1000
);
}
}
class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDeduplication(false);
}
}
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "notifyAboutNewOrder")]
public function notifyAboutNewOrder(OrderWasPlaced $event, NotificationService $notificationService) : void
{
$notificationService->notifyAboutNewOrder($event->getOrderId());
}
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "reserveItemsInStock")]
public function reserveItemsInStock(OrderWasPlaced $event, StockClient $stockClient): void
{
$stockClient->reserve($event->getOrderId(), $event->getProducts());
}
Async execution — #[Asynchronous('notifications')] routes to RabbitMQ, SQS, Kafka, or DBAL — your choice of transport
Failure isolation — each event handler gets its own copy of the message, so one handler's failure never blocks another
Retries and dead letter — failed messages retry automatically, permanently failed ones go to a dead letter queue you can inspect and replay
class OrderService
{
#[CommandHandler]
public function placeOrder(PlaceOrder $command, EventBus $eventBus): void
{
// your business logic
$eventBus->publish(new OrderWasPlaced($command->orderId));
}
#[QueryHandler('order.getStatus')]
public function getStatus(string $orderId): string
{
return $this->orders[$orderId]->status;
}
}
class NotificationService
{
#[Asynchronous('notifications')]
#[EventHandler]
public function whenOrderPlaced(OrderWasPlaced $event, NotificationSender $sender): void
{
$sender->sendOrderConfirmation($event->orderId);
}
}
Setting up Error Channel means that Message Consumer will send Error Message to error channel and then continue handling next messages.
After sending Error Message to Error Channel, message is considered handled as long as Error Handler does not throw exception.
Handling Error Messages
Manual Handling
To handle incoming Error Messages, we can bind to our defined Error Channel using ServiceActivator:
Internal Handlers are endpoints like Command Handlers, however they are not exposed using Command/Event/Query Buses.
You may use them for internal handling.
Delayed Retries
Ecotone provides inbuilt retry mechanism, in case of failure Error Message will be resent to its original Message Channel with a delay. This way we will give application a chance to self-heal and return to good state.
Using inbuilt retry mechanism to resend Message with delay
To configure Delayed Retries we need to set up Error Configuration and connect it to our Error Channel:
Discarding all Error Messages
If for some cases we want to discard Error Messages, we can set up error channel to default inbuilt one called "nullChannel".
That may be used in combination of retries, if after given attempt Message is still not handled, then discard:
Dbal Dead Letter
Ecotone comes with full support for managing full life cycle of a error message.
This allows us to store Message in database for later review. Then we can review the Message, replay it or delete.
Dead Letter can be combined with Delayed Retries, to store only Error Messages that can't self-heal.
Read more in related section.
Command Bus Error Channel
Route failed synchronous commands to dedicated error handling with a single #[ErrorChannel] attribute. Instead of catching exceptions in each handler and manually routing to error handling, declare the error channel once. Failed messages are automatically routed for retry, logging, or dead-letter processing.
You'll know you need this when:
Failed commands need specific error handling: alerting, manual review, or audit trails
Payment or financial operations require failure tracking for compliance
You receive webhooks and need to handle failures gracefully instead of throwing exceptions
Scattered try/catch blocks in handlers are becoming unmanageable
Different command categories need different error handling strategies
Command Bus Error Channel is available as part of Ecotone Enterprise.
Command Bus with Error Channel
To set up Error Channel for Command Bus, we will extend Command Bus with our Interface and add ErrorChannel attribute.
Command Bus with Dead Letter
Now instead of using CommandBus, we will be using ResilientCommandBus for sending Commands.
Whenever failure will happen, instead being propagated, it will now will be redirected to our Dead Letter and stored in database for later review.
Command Bus with Error Channel and Instant Retry
We can extend our Command Bus with Error Channel by providing instant retries.
This way we can do automatic retries before we will consider Message as failed and move it to the Error Channel. This way we give ourselves a chance of self-healing automatically in case of transistent errors, like database or network exceptions.
Now instead of using CommandBus, we will be using ResilientCommandBus for sending Commands.
Whenever failure will happen, instead being propagated, it will now will be redirected to our Dead Letter and stored in database for later review.
Command Bus with Asynchronous Error Channel
Instead of pushing Message to Error Channel, we can push it to Asynchronous Message Channel from which Message will be consumed and retried again. This way in case of failure we can make it possible for Message to be retried and end up self-healing.
Command Bus with Asynchronous Error Channel
and then for use RabbitMQ Message Channel:
It's good practice to use different Message Channel implementation than the storage used during process the Message. For example if our processing requires database connection and our database went down, then if our configured channel is RabbitMQ channel, then we will be able to push those Messages into the Queue instead of failing.
Command Bus with Asynchronous Error Channel and Delayed Retries
We can combine Asynchronous Error Channel together with delayed retries, creating robust solution, that our Application is able to self-heal from transistent errors even if they take some period of time.
For example if our calling some external Service fails, or database went down, then we may receive the same error when Message is retrieved by Async Channel. However if we will delay that by 20 seconds, then there is huge chance that everything will get back on track, and the Application will self-heal automatically.
Command Bus with Asynchronous Error Channel and delayed retries
Command Bus configuration:
And delayed retry configuration:
Of course we could add Dead Letter channel for our delayed retries configuration. Closing the full flow, that even if in case delayed retries failed, we will end up with Message in Dead Letter.
class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
// For Message Consumer orders, configure error channel
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
];
}
}
#[InternalHandler("errorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// handle exception
$exception = $errorMessage->getExceptionMessage();
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
// if retry strategy will not recover, then discard
"nullChannel"
);
}
final readonly class EcotoneConfiguration
{
#[ServiceContext]
public function databaseChannel()
{
return AmqpBackedMessageChannelBuilder::create('orders');
}
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"async_channel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}
Choosing the Right Mode
This is about where and when execution happens, and what consistency consequences you accept:
Feature
Sync Event-Driven
Async Event-Driven
Polling (Enterprise)
Consistency
Immediate
Eventual
Eventual
Transaction
Execution mode does not affect horizontal scaling. For parallel processing across multiple aggregates, see Scaling and Advanced — which uses Partitioned or Streaming projections (Enterprise).
You can start with synchronous projections for simplicity, and switch to asynchronous later by adding a single attribute — no code changes needed in your projection handlers.
Synchronous Event-Driven (Default)
By default, projections execute synchronously — in the same process and the same database transaction as the Command Handler that produced the events.
Synchronous projections run within the same database transaction as the Event Store changes. When you query the Read Model right after a command, you always get consistent, up-to-date data.
When to use:
Low write volume — a few writes per second
Testing — immediate feedback, no async complexity
Simple applications — where eventual consistency adds unnecessary complexity
Trade-off: If the projection is slow (complex queries, external calls), it slows down the entire command handling. For high-throughput scenarios, consider asynchronous execution.
Asynchronous Event-Driven
To decouple the projection from the command handler, mark it as asynchronous. The event is delivered via a message channel and processed by a background worker:
The projection code stays exactly the same — you just add #[Asynchronous('projections')]. Ecotone handles delivering the trigger event via the projections channel.
To start the background worker:
bin/console ecotone:run projections -vvv
artisan ecotone:run projections -vvv
$messagingSystem->run('projections');
When to use:
High write volume — projection processing shouldn't slow down commands
Multiple projections — each can process at its own pace
Production workloads — decoupled, resilient processing
Multiple projections can share the same async channel (same consumer process), or each can have its own dedicated channel.
Trade-off: Data in the Read Model may be slightly behind the Event Store (eventual consistency). If you query immediately after a command, you might get stale results.
Batch Size and Flushing
By default, projections load up to 1000 events per batch. You can customize this with #[ProjectionExecution]:
How Batching Works
Events are processed in batches, and each batch is wrapped in its own database transaction. After each batch:
#[ProjectionFlush] handler is called (if defined)
The projection's position is saved
The transaction is committed
This prevents one massive transaction from locking your database tables for the entire projection run. Even if you have 100,000 events to process, the database is only locked for one batch at a time.
Ecotone automatically manages transactions at batch boundaries. In async mode, each batch gets its own transaction — not the entire message processing. If you use Doctrine ORM, Ecotone also flushes and clears the EntityManager at batch boundaries automatically, preventing memory leaks.
Polling (Enterprise)
Polling projections run as a dedicated background process that periodically queries the event store for new events:
Polling projections are available as part of Ecotone Enterprise.
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
class TicketListProjection
{
// No additional attributes needed — synchronous is the default
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// This runs in the same transaction as the command
}
}
#[Asynchronous('projections')]
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
class TicketListProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// This runs in a separate process, triggered by the message channel
}
}
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
#[ProjectionExecution(eventLoadingBatchSize: 500)]
class TicketListProjection
{
// Events are loaded and processed 500 at a time
}
#[ProjectionFlush]
public function flush(): void
{
// Called after each batch of events is processed
// Useful for flushing buffers, clearing caches, etc.
}
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
#[Polling('ticket_list_poller')]
class TicketListProjection
{
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
// Executed when the poller finds new events
}
}
Backfill and Rebuild
PHP Event Sourcing Projection Backfill and Rebuild
The Problem
You deployed a new "order analytics" projection to production, but it only processes events from now on. You have 2 years of order history sitting in the event store. How do you populate the projection with historical data? And later, when you fix a bug in the projection logic, how do you replay everything?
Backfill — Populating a New Projection
Backfill processes all historical events from position 0 to the current position. It's used when you deploy a fresh projection and need to populate it with past data.
Sync Backfill
Add #[ProjectionBackfill] to your projection and run the CLI command:
Then run:
The backfill reads all events from the beginning of the stream, processing them in . After backfill completes, the projection is caught up and will process new events as they arrive.
Backfill runs synchronously and is available in the open-source edition.
Async Backfill (Enterprise)
For large event stores with millions of events, synchronous backfill may take too long — it runs in the CLI process and blocks until all events are processed. By setting asyncChannelName, the backfill command instead dispatches messages to a channel, turning the backfill into an asynchronous background process:
Run the backfill command (dispatches messages instantly), then start workers to process them:
Scaling Async Backfill with Partitioned Projections
The real power of async backfill comes when combined with #[Partitioned]. Each partition (aggregate) can be backfilled independently, so the work is split into batches that multiple workers process in parallel:
When you run the backfill command with 10,000 aggregates and backfillPartitionBatchSize: 100:
Ecotone dispatches 100 messages to backfill_channel (10,000 / 100)
With partitioned projections, both backfill and rebuild scale linearly with worker count. A backfill that takes 2 hours with 1 worker takes 12 minutes with 10 workers.
Async backfill is available as part of Ecotone Enterprise.
Rebuild — Reset and Replay (Enterprise)
Rebuild is different from backfill: it resets an existing projection (clears data and position) and then replays all events from the beginning.
Use rebuild when:
You fixed a bug in a handler and the Read Model has incorrect data
You changed the projection's schema and need to reprocess everything
You want to add a new event handler to an existing projection and apply it retroactively
Rebuild is available as part of Ecotone Enterprise.
How rebuild works depends on the projection type — and the difference is significant.
Rebuilding a Global Projection
For a globally tracked projection, rebuild works as reset + backfill on the entire dataset:
#[ProjectionReset] is called — clears all data (e.g., DELETE FROM ticket_list)
Position is reset to the beginning
All events in the stream are replayed through the handlers
Global rebuild deletes all data first, then repopulates. During the rebuild window, the Read Model is empty or incomplete. This can also lock the table depending on your database. For zero-downtime alternatives, see .
Rebuilding a Partitioned Projection
For partitioned projections, rebuild is much safer. Instead of resetting the entire projection at once, Ecotone rebuilds each partition (aggregate) separately:
For each partition: within a transaction, delete that partition's projected data and re-project it
Other partitions are unaffected — they continue serving reads normally
Only one aggregate's data is unavailable at a time, and only briefly
Notice the key difference: #[ProjectionReset] receives #[PartitionAggregateId] — it only deletes the data for the specific aggregate being rebuilt, not the entire table.
Controlling Rebuild Batch Size
The partitionBatchSize parameter controls how many partitions are processed per rebuild command:
With 1000 aggregates and partitionBatchSize: 50, Ecotone dispatches 20 rebuild commands — each processing 50 partitions.
Scaling Rebuild with Async Workers
For large projections, you can distribute rebuild work across multiple workers:
When you run ecotone:projection:rebuild ticket_details:
Ecotone counts the partitions (e.g., 1000 aggregates)
Divides them into batches of 50 → 20 messages
Sends all 20 messages to rebuild_channel
This means you can rebuild a projection with millions of aggregates by simply scaling up your worker count. Just like with , throughput scales linearly with the number of workers.
Run the rebuild command, then start workers:
Sync Rebuild
Without asyncChannelName, rebuild runs synchronously — all partitions are processed in the current process:
During rebuild, the Read Model is being repopulated. If you need zero-downtime rebuilds, see .
Backfill vs Rebuild
Backfill
Rebuild (Global)
Rebuild (Partitioned)
Projections with Document Store
PHP Event Sourcing Projections with Document Store
The Problem
You want to build a Read Model quickly but writing raw SQL for every projection — CREATE TABLE, INSERT, UPDATE, SELECT — is tedious and error-prone. You just want to store and retrieve PHP objects or arrays without managing schema yourself. How do you build projections without writing SQL?
What is the Document Store?
Ecotone's DocumentStore is a key-value store that automatically serializes and deserializes PHP objects and arrays to JSON. You organize data in collections (like database tables) and access individual documents by ID.
It's available out of the box with DBAL — no extra setup needed. Think of it as a simpler alternative to writing raw SQL for your Read Models.
Building a Projection with Document Store
Instead of injecting a Connection and writing SQL, inject DocumentStore and work with PHP objects directly:
Notice there's no #[ProjectionInitialization] to create tables, no #[ProjectionDelete] to drop them — the Document Store handles storage automatically.
Available Operations
The DocumentStore interface provides these methods:
Method
Description
Storing PHP Objects
The Document Store can store PHP objects directly — Ecotone automatically serializes them to JSON and deserializes them back:
When storing objects, Ecotone uses the configured serializer (e.g., JMS Converter) to convert them to JSON. The same object type is returned when reading — no manual deserialization needed.
Using upsertDocument for Simpler Logic
When you don't want to distinguish between "first time" and "update", use upsertDocument to simplify your handlers:
Lifecycle with Document Store
When using Document Store, you can simplify your lifecycle hooks by operating on collections:
Testing with In-Memory Document Store
For tests, Ecotone provides an InMemoryDocumentStore that works identically to the DBAL version but stores everything in memory — no database needed:
InMemoryDocumentStore is perfect for unit and integration tests — it has the same API as the DBAL version, runs instantly, and requires no database setup.
When to Use Document Store vs Raw SQL
Document Store
Raw SQL (Connection)
You can mix both approaches in the same application — use Document Store for simple projections and raw SQL for complex ones. They are not mutually exclusive.
Lesson 2: Tactical DDD
DDD PHP
Not having code for Lesson 2?
git checkout lesson-2
Aggregate
An Aggregate is an entity or group of entities that is always kept in a consistent state.
Aggregates are very explicitly present in the Command Model, as that is where change is initiated and business behaviour is placed.
Let's create our first AggregateProduct.
Aggregate attribute marks class to be known as Aggregate
Identifier marks properties as identifiers of specific Aggregate instance. Each Aggregate must contains at least one identifier.
CommandHandler enables command handling on specific method just as we did in .
If method is static, it's treated as a and must return a new aggregate instance. Rule applies as long as we use
If you want to known more details about Aggregate start with chapter
Now remove App\Domain\Product\ProductService as it contains handlers for the same command and query classes.
Before we will run our test scenario, we need to register Repository.
Usually you will mark services as Query Handlers not aggregates .However Ecotone does not block possibility to place Query Handler on Aggregate. It's up to you to decide.
Repository
Repositories are used for retrieving and saving the aggregate to persistent storage.
We will build an in-memory implementation for now.
Repository attribute marks class to be known to Ecotone as Repository.
We need to implement some methods in order to allow Ecotone to retrieve and save Aggregate. Based on implemented interface, Ecotone knowns, if Aggregate is state-stored or event sourced.
If you want to known more details about Repository start with chapter
Let's run our testing command:
Have you noticed what we are missing here? Our Event Handler was not called, as we do not publish the ProductWasRegistered event anymore.
Event Publishing
In order to automatically publish events recorded within Aggregate, we need to add method annotated with AggregateEvents. This will tell Ecotone where to get the events from.
Ecotone comes with default implementation, that can be used as trait WithEvents.
You may implement your own method for returning events, if you do not want to be coupled with the framework.
Let's run our testing command:
Congratulations, we have just finished Lesson 2.
In this lesson we have learnt how to make use of Aggregates and Repositories.
Now we will learn about Converters and Metadata
Ecotone Enterprise features for scaling multi-tenant and multi-service PHP systems
Ecotone Free gives you production-ready CQRS, Event Sourcing, and Workflows — message buses, aggregates, sagas, async messaging, retries, error handling, and full testing support.
Ecotone Enterprise is for when your system outgrows single-tenant, single-service, or needs advanced resilience and scalability.
Free vs Enterprise at a Glance
Capability
Projection Introduction
PHP Event Sourcing Projections
The Problem
Once you start storing storing events instead of updating rows — you will quickly find out your users still need a ticket list page, a dashboard, a report. How do you turn a stream of "what happened" into a table you can query?
In traditional applications, when a ticket is created you run an INSERT, when it's closed you run an UPDATE. The database always holds the current state. But with Event Sourcing, you store what happened — TicketWasRegistered
#[ProjectionV2('order_analytics')]
#[FromAggregateStream(Order::class)]
#[ProjectionBackfill]
class OrderAnalyticsProjection
{
#[EventHandler]
public function onOrderPlaced(OrderWasPlaced $event): void
{
// This will process ALL historical OrderWasPlaced events during backfill
}
#[ProjectionInitialization]
public function init(): void { /* CREATE TABLE */ }
#[ProjectionReset]
public function reset(): void { /* DELETE FROM */ }
}
#[ProjectionV2('order_analytics')]
#[FromAggregateStream(Order::class)]
#[ProjectionBackfill(asyncChannelName: 'backfill_channel')]
class OrderAnalyticsProjection
{
// Same handlers as above
}
# Dispatches backfill messages to the channel
bin/console ecotone:projection:backfill order_analytics
# Start workers to process (run multiple for parallel processing)
bin/console ecotone:run backfill_channel -vvv
#[ProjectionV2('order_analytics')]
#[FromAggregateStream(Order::class)]
#[Partitioned]
#[ProjectionBackfill(backfillPartitionBatchSize: 100, asyncChannelName: 'backfill_channel')]
class OrderAnalyticsProjection
{
// Same handlers
}
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
#[ProjectionRebuild]
class TicketListProjection
{
#[ProjectionReset]
public function reset(): void
{
// Clears ALL data — entire table
$this->connection->executeStatement('DELETE FROM ticket_list');
}
// ... event handlers
}
#[ProjectionV2('ticket_details')]
#[FromAggregateStream(Ticket::class)]
#[Partitioned]
#[ProjectionRebuild(partitionBatchSize: 50)]
class TicketDetailsProjection
{
#[ProjectionReset]
public function reset(#[PartitionAggregateId] string $aggregateId): void
{
// Resets only THIS aggregate's data — not the whole table
$this->connection->executeStatement(
'DELETE FROM ticket_details WHERE ticket_id = ?',
[$aggregateId]
);
}
// ... event handlers
}
#[ProjectionRebuild(partitionBatchSize: 50)]
#[ProjectionV2('ticket_details')]
#[FromAggregateStream(Ticket::class)]
#[Partitioned]
#[ProjectionRebuild(partitionBatchSize: 50, asyncChannelName: 'rebuild_channel')]
class TicketDetailsProjection
{
// ... same as above
}
#[ProjectionRebuild(partitionBatchSize: 50)]
// No asyncChannelName — rebuild happens immediately during CLI command
#[ProjectionV2('available_balance')]
#[FromAggregateStream(Account::class)]
class AvailableBalanceProjection
{
public function __construct(private DocumentStore $documentStore) {}
#[EventHandler]
public function whenAccountSetup(AccountSetup $event): void
{
$this->documentStore->addDocument(
'available_balance',
$event->accountId,
['balance' => 0]
);
}
#[EventHandler]
public function whenPaymentMade(PaymentMade $event): void
{
$current = $this->documentStore->getDocument(
'available_balance',
$event->accountId
);
$this->documentStore->updateDocument(
'available_balance',
$event->accountId,
['balance' => $current['balance'] + $event->amount]
);
}
#[QueryHandler('getCurrentBalance')]
public function getCurrentBalance(string $accountId): int
{
return $this->documentStore->getDocument(
'available_balance',
$accountId
)['balance'];
}
}
class WalletBalance
{
public function __construct(
public readonly string $walletId,
public readonly int $currentBalance,
) {}
public function add(int $amount): self
{
return new self($this->walletId, $this->currentBalance + $amount);
}
}
#[ProjectionV2('wallet_balance')]
#[FromAggregateStream(Wallet::class)]
class WalletBalanceProjection
{
public function __construct(private DocumentStore $documentStore) {}
#[EventHandler]
public function whenWalletCreated(WalletWasCreated $event): void
{
$this->documentStore->addDocument(
'wallet_balance',
$event->walletId,
new WalletBalance($event->walletId, 0)
);
}
#[EventHandler]
public function whenMoneyAdded(MoneyWasAddedToWallet $event): void
{
/** @var WalletBalance $wallet */
$wallet = $this->documentStore->getDocument('wallet_balance', $event->walletId);
$this->documentStore->updateDocument(
'wallet_balance',
$event->walletId,
$wallet->add($event->amount)
);
}
#[QueryHandler('getWalletBalance')]
public function getBalance(string $walletId): WalletBalance
{
return $this->documentStore->getDocument('wallet_balance', $walletId);
}
}
#[EventHandler]
public function whenTicketRegistered(TicketWasRegistered $event): void
{
$this->documentStore->upsertDocument(
'ticket_list',
$event->ticketId,
['ticketId' => $event->ticketId, 'type' => $event->type, 'status' => 'open']
);
}
#[EventHandler]
public function whenTicketClosed(TicketWasClosed $event): void
{
$this->documentStore->upsertDocument(
'ticket_list',
$event->ticketId,
['ticketId' => $event->ticketId, 'status' => 'closed']
);
}
#[ProjectionDelete]
public function delete(): void
{
$this->documentStore->dropCollection('wallet_balance');
}
#[ProjectionReset]
public function reset(): void
{
$this->documentStore->dropCollection('wallet_balance');
}
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
#[Aggregate]
class Product
{
#[Identifier]
private int $productId;
private int $cost;
private function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
#[CommandHandler]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler]
public function getCost(GetProductPriceQuery $query) : int
{
return $this->cost;
}
}
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\Repository;
use Ecotone\Modelling\StandardRepository;
#[Repository] // 1
class InMemoryProductRepository implements StandardRepository // 2
{
/**
* @var Product[]
*/
private $products = [];
// 3
public function canHandle(string $aggregateClassName): bool
{
return $aggregateClassName === Product::class;
}
// 4
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
if (!array_key_exists($identifiers["productId"], $this->products)) {
return null;
}
return $this->products[$identifiers["productId"]];
}
// 5
public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void
{
$this->products[$identifiers["productId"]] = $aggregate;
}
}
# As default auto wire of Laravel creates new service instance each time
# service is requested from Depedency Container, we need to register
# ProductService as singleton.
# Go to bootstrap/QuickStartProvider.php and register our ProductService
namespace Bootstrap;
use App\Domain\Product\InMemoryProductRepository;
use Illuminate\Support\ServiceProvider;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(InMemoryProductRepository::class, function(){
return new InMemoryProductRepository();
});
}
(...)
Everything is set up by the framework, please continue...
Everything is set up, please continue...
bin/console ecotone:quickstart
Running example...
100
Good job, scenario ran with success!
use Ecotone\Modelling\WithEvents;
#[Aggregate]
class Product
{
use WithEvents;
#[Identifier]
private int $productId;
private int $cost;
private function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
(...)
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!
Free
Enterprise
CQRS (Commands, Queries, Events)
Yes
Yes
Event Sourcing & Projections
Yes
Yes
Sagas (Stateful Workflows)
Yes
Yes
Handler Chaining (Pipe & Filter)
Ecotone Plans
Ecotone comes with two plans:
Ecotone Free comes with Apache License Version 2.0. It provides everything you need to build message-driven systems in PHP -- CQRS, aggregates, event sourcing, sagas, async messaging, interceptors, and full testing support. This covers all features not marked as Enterprise.
Ecotone Enterprise adds production-grade capabilities for teams whose systems have grown into multi-tenant, multi-service, or high-throughput environments. It brings advanced workflow orchestration, cross-service communication, resilient command handling, and resource optimization.
Every Enterprise licence directly funds continued development of Ecotone's open-source core. When Enterprise succeeds, the entire ecosystem benefits.
Each Enterprise feature is marked with hint on the documentation page. Enterprise features can only be run with licence key.
To evaluate Enterprise features, email us at "[email protected]" to receive trial key. Production license keys are available at https://ecotone.tech.
Signs You're Ready for Enterprise
You don't need Enterprise on day one. These are the growth signals that tell you it's time:
"We're serving multiple tenants and need isolation"
A noisy tenant's queue backlog shouldn't affect others. Per-tenant scaling shouldn't mean building custom routing infrastructure.
Dynamic Message Channels -- Route messages per-tenant at runtime using header-based or round-robin strategies. Declare the routing once, Ecotone manages the rest. Add tenants by updating the mapping -- no handler code changes.
"We have complex multi-step business processes"
Business stakeholders ask "what are the steps in this process?" and the answer requires reading multiple files. Adding or reordering steps touches code in many places.
Orchestrators -- Define workflow sequences declaratively in one place. Each step is independently testable and reusable. Dynamic step lists adapt to input data without touching step code.
"We're running multiple services that need to talk to each other"
Building custom inter-service messaging wiring for each service pair has become unsustainable. Different services use different brokers and you need them to communicate.
Distributed Bus with Service Map -- Cross-service messaging that supports multiple brokers (RabbitMQ, SQS, Redis, Kafka) in a single topology. Swap transports without changing application code.
"Our projections need to scale, rebuild safely, or deploy without downtime"
A single global projection can't keep up with event volume. Rebuilding wipes the read model for 30 minutes. Changing projection schema means downtime for users.
Partitioned Projections -- One partition per aggregate with independent position tracking. Failures isolate to a single aggregate instead of blocking everything. Indexed event loading skips irrelevant events for dramatically faster processing. Works with both sync and async execution.
Async Backfill & Rebuild -- Push backfill and rebuild to asynchronous background workers with asyncChannelName. Combined with partitioned projections, the work is split into batches that multiple workers process in parallel — throughput scales linearly with worker count. A backfill that takes 2 hours with 1 worker takes 12 minutes with 10.
Blue-Green Deployments -- Deploy a new projection version alongside the old one. The new version catches up from history while the old one serves traffic. Switch when ready, delete the old one. Zero downtime.
-- Consume events directly from Kafka or RabbitMQ Streams instead of the database event store. For cross-system integration and external event sources.
-- Accumulate state in memory across a batch of events and persist once at flush. Process 1000 events with zero database writes, then one bulk insert. Dramatically faster rebuilds.
"We need high-throughput event streaming"
RabbitMQ throughput is becoming a bottleneck, or multiple services need to consume the same event stream independently.
Kafka Integration -- Native Kafka support with the same attribute-driven programming model. No separate producer/consumer boilerplate.
RabbitMQ Streaming Channel -- Kafka-like persistent event streaming on existing RabbitMQ infrastructure. Multiple independent consumers with position tracking.
"Our production system needs to be resilient"
Transient failures cause unnecessary handler failures. Duplicate commands from user retries or webhooks lead to double-processing. Exception handling is scattered across handlers.
Command Bus Instant Retries -- Recover from transient failures (deadlocks, network blips) with a single #[InstantRetry] attribute. No manual retry loops.
Command Bus Error Channel -- Route failed synchronous commands to dedicated error handling with #[ErrorChannel]. Replace scattered try/catch blocks with centralized error routing.
Gateway-Level Deduplication -- Prevent duplicate command processing at the bus level. Every handler behind that bus is automatically protected.
"We want less infrastructure code in our domain"
Repository injection boilerplate obscures business logic. Every handler follows the same fetch-modify-save pattern. Making the entire bus async requires annotating every handler individually.
Instant Aggregate Fetch -- Aggregates arrive in your handler automatically via #[Fetch]. No repository injection, just business logic.
Event Sourcing Handlers with Metadata -- Pass metadata to #[EventSourcingHandler] for context-aware aggregate reconstruction without polluting event payloads.
Asynchronous Message Buses -- Make an entire command or event bus async with a single configuration change, instead of annotating every handler.
"We need per-handler control over async endpoint behavior"
Database transactions are globally enabled for your message channel, but some handlers only call a 3rd party API or send emails — wrapping them in a transaction wastes connections and holds locks unnecessarily.
Async Endpoint Annotations -- Pass endpointAnnotations on #[Asynchronous] to selectively disable transactions, message collectors, or inject custom configuration for specific handlers while keeping global defaults for the rest of the channel.
"We need production-grade RabbitMQ consumption"
Custom consumer scripts need manual connection handling, reconnection logic, and shutdown management.
Rabbit Consumer -- Set up RabbitMQ consumption with a single attribute. Built-in reconnection, graceful shutdown, and health checks out of the box.
Think of it like a bank account: instead of storing "balance = 500", you store every deposit and withdrawal. The balance is derived by replaying the history.
But your users don't want to replay history every time they load a page. They need a ready-to-query table. That's what Projections do.
What is a Projection?
A Projection reads events from an Event Stream (the append-only log) and builds a read-optimized view from them — a database table, a document, a cache entry. Think of it as a materialized view built from events.
Another analogy: the Event Stream is like your Git history — every commit ever made. The Projection is like your working directory — the current state of the files, derived from that history.
The views built by Projections are called Read Models. They exist only for reading and can be rebuilt at any time from the Event Stream.
Events stored in the Event Stream
From these events, we want to build a list of all tickets with their current status:
Read Model: list of tickets with current status
Building Your First Projection
Let's say we have a Ticket Event Sourced Aggregate that produces two events — TicketWasRegistered and TicketWasClosed. We want to build a read model table showing all in-progress tickets.
That's all you need. Let's break down what each part does:
#[ProjectionV2('ticket_list')] — marks this class as a Projection with name ticket_list
#[FromAggregateStream(Ticket::class)] — tells the Projection to read events from the Ticket aggregate's stream
#[ProjectionInitialization] — called when the Projection is first set up (creates the table)
#[EventHandler] — subscribes to specific event types. Ecotone routes events by the type-hint.
#[ProjectionDelete] and #[ProjectionReset] — called when the projection is deleted or reset
There is no additional configuration needed. Ecotone takes care of delivering events, initializing, and triggering the Projection.
Position Tracking
Each Projection remembers where it left off in the Event Stream — like a bookmark in a book. When a new event triggers the Projection, it fetches only the events after its last position.
This means:
New Projections start from the beginning of the stream and catch up to the present
Existing Projections only process new events they haven't seen yet
After a failure, the Projection resumes from its last successfully committed position
This is what makes it possible to deploy a new Projection at any point in time and have it automatically build up from the full event history.
Feature Overview
Ecotone Projections come in two editions. The open-source edition covers the full projection lifecycle for globally tracked projections. Enterprise adds scaling, advanced operations, and deployment strategies.
— keep state between events without external storage
— notify after the projection is up to date
— populate with historical data
— transactions, rollback, self-healing
— how Ecotone guarantees no events are lost
— partitioned, streaming, polling (Enterprise)
— zero-downtime projection changes (Enterprise)
#[ProjectionV2('ticket_list')]
#[FromAggregateStream(Ticket::class)]
class TicketListProjection
{
public function __construct(private Connection $connection) {}
#[ProjectionInitialization]
public function init(): void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS ticket_list (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25),
status VARCHAR(25)
)
SQL);
}
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
$this->connection->insert('ticket_list', [
'ticket_id' => $event->ticketId,
'ticket_type' => $event->type,
'status' => 'open',
]);
}
#[EventHandler]
public function onTicketClosed(TicketWasClosed $event): void
{
$this->connection->update(
'ticket_list',
['status' => 'closed'],
['ticket_id' => $event->ticketId]
);
}
#[ProjectionDelete]
public function delete(): void
{
$this->connection->executeStatement('DROP TABLE IF EXISTS ticket_list');
}
#[ProjectionReset]
public function reset(): void
{
$this->connection->executeStatement('DELETE FROM ticket_list');
}
}
Lesson 4: Metadata and Method Invocation
PHP Metadata and Method Invocation
Not having code for Lesson 4?git checkout lesson-4
Metadata
Message can contain of Metadata. Metadata is just additional information stored along side to the Message's payload. It may contain things like currentUser, timestamp, contentType, messageId.
In Ecotone headers and metadata means the same. Those terms will be used interchangeably.
To test out Metadata, let's assume we just got new requirement for our Products in Shopping System.:
User who registered the product, should be able to change it's price.
Let's start by adding ChangePriceCommand
We will handle this Command in a minute. Let's first add user information for registering the product.
We will do it, using Metadata. Let's get back to our Testing Class EcotoneQuickstart and add 4th argument to our CommandBus call.
sendWithRouting accepts 4th argument, which is associative array. Whatever we will place in here, will be available during message handling for us - This actually our Metadata. It's super simple to pass new Headers, it's matter of adding another key to the array.
Now we can change our Product aggregate:
We have added second parameter $metadata to our CommandHandler. Ecotone read parameters and evaluate what should be injected. We will see soon, how can we take control of this process.
We can add changePrice method now to our Aggregate:
And let's call it with incorrect userId and see, if we get the exception.
Let's run our testing command:
Method Invocation
We have been just informed, that customers are registering new products in our system, which should not be a case. Therefore our next requirement is:
Only administrator should be allowed to register new Product
Let's create simple UserService which will tell us, if specific user is administrator.
In our testing scenario we will suppose, that only user with id of 1 is administrator.
Now we need to think where we should call our UserService.
The good place for it, would not allow for any invocation of product.register command without being administrator, otherwise our constraint may be bypassed.
Ecotone does allow for auto-wire like injection for endpoints. All services registered in Depedency Container are available.
Great, there is no way to bypass the constraint now. The isAdmin constraint must be satisfied in order to register new product.
Let's correct our testing class.
Let's run our testing command:
Injecting arguments
Ecotone inject arguments based on Parameter Converters.
Parameter converters , tells Ecotone how to resolve specific parameter and what kind of argument it is expecting. The one used for injecting services like UserService is Reference parameter converter.
Let's see how could we use it in our product.register command handler.
Let's suppose UserService is registered under user-service in Dependency Container. Then we would need to set up the CommandHandlerlike below.
Reference- Does inject service from Dependency Container. If referenceName, which is name of the service in the container is not given, then it will take the class name as default.
Payload - Does inject payload of the . In our case it will be the command itself
Headers - Does inject all headers as array.
Header - Does inject single header from the .
There is more to be said about this, but at this very moment, it will be enough for us to know that such possibility exists in order to continue.
You may read more detailed description in \
Default Converters
Ecotone, if parameter converters are not passed provides default converters.
First parameter is always Payload.
The second parameter, if is array then Headers converter is taken, otherwise if class type hint is provided for parameter, then Reference converter is picked.
If we would want to manually configure parameters for product.register Command Handler, then it would look like this:
We could also inject specific header and let Ecotone convert it directly to specific object (if we have Converter registered):
Great, we have just finished Lesson 4!
In this Lesson we learned about using Metadata to provide extra information to our Message.
Besides we took a look on how arguments are injected into endpoint and how we can make use of it.
Now we will learn about powerful Interceptors, which can be describes as Middlewares on steroids.
CQRS Introduction - Commands
Commands CQRS PHP
In this section, we will look at how to use Commands, Events, and Queries.
This will help you understand the basics of Ecotone’s CQRS support and how to build a message-driven application.
Command Handlers are methods where we typically place our business logic, so we’ll start by exploring how to use them.
Handling Commands
Any service available in your Dependency Container can become a Command Handler.
Command Handlers are responsible for performing business actions in your system.
In Ecotone-based applications, you register a Command Handler by adding the CommandHandler attribute to the specific method that should handle the command:
In the example above, the #[CommandHandler] attribute tells Ecotone that the "createTicket" method should handle the CreateTicketCommand.
The first parameter of a Command Handler method determines which command type it handles — in this case, it is CreateTicketCommand.
In Ecotone, the class itself is not a Command Handler — only the specific method is.
This means you can place multiple Command Handlers inside the same class, to make correlated actions available under same API class.
If you are using autowiring, all your classes are registered in the container under their class names.
This means Ecotone can automatically resolve them without any extra configuration.
If your service is registered under a different name in the Dependency Container, you can use ClassReference to point Ecotone to the correct service:
Sending Commands
We send a Command using the Command Bus. After installing Ecotone, all Buses are automatically available in the Dependency Container, so we can start using them right away.
Before we can send a Command, we first need to define it:
All Messages (Commands, Queries, and Events), as well as Message Handlers, are just plain PHP objects.
They don’t need to extend or implement any Ecotone-specific classes.
This keeps your business code clean, simple, and easy to understand.
To send a command, we use the send method on the CommandBus.
The command gets automatically routed to its corresponding Command Handler
Sending Commands with Metadata
We can send commands with metadata (also called Message Headers) through the Command Bus.
This lets us include additional context that doesn't belong in the command itself, or share information across multiple Command Handlers without duplicating it in each command class.
And then to access given metadata, we will be using Header attribute:
The #[Header] attribute tells Ecotone to fetch a specific piece of metadata using the key executorId. This way, Ecotone knows exactly which metadata value to pass into our Command Handler.
If we use Command Handler, Ecotone will ensure our metadata will be serialized and deserialized correctly.
Injecting Services into Command Handler
If we need additional services from the Dependency Container to handle our business logic, we can inject them into our Command Handler using the #[Reference] attribute:
In case Service is defined under custom id in DI, we may pass the reference name to the attribute:
Sending Commands via Routing
In Ecotone we may register Command Handlers under routing instead of a class name.
This is especially useful if we will register to tell Ecotone how to deserialize given Command. This way we may simplify higher level code like Controllers or Console Line Commands by avoid transformation logic.
Ecotone is using message routing for . This way applications can stay decoupled from each other, as there is no need to share the classes between them.
Routing without Command Classes
There may be cases where creating Command classes is unnecessary boilerplate, in those situations, we may simplify the code and make use scalars, arrays or non-command classes directly.
Ecotone provides flexibility which allows to create Command classes when there are actually needed. In other cases we may use routing functionality together with simple types in order to fulfill our business logic.
Returning Data from Command Handler
Sometimes we need to return a value immediately after handling a command. This is useful for scenarios that require instant feedback—for example, when processing a payment, we might need to return a redirect URL to guide the user to the payment gateway.
Ecotone's allows for returning data from Command Handler, that will be available as a result from your CommandBus:
The returned data will be available as result of the Command Bus.
Keep in mind that return values only work with synchronous Command Handlers. For asynchronous handlers, we can't return values directly because the command is processed in the background—instead, we'd use events or callbacks to communicate results back to the user when processing completes.
Sending Commands with deserialization
When any mechanism is configured (For example ), we can let Ecotone do the deserialization in-fly, so we don't need to both with doing custom transformations in the Controller:
The difference between Events and Command is in intention. Commands are meant to trigger an given action and events are information that given action was performed successfully.
Handling Events
To register Event Handler, we will be using EventHandler attribute. By marking given method as Event Handler, we are stating that this method should subscribe to specific Event Class:
In above scenario we are subscribing to TicketWasCreated Event, therefore whenever this Event will be published, this method will be automatically invoked.
Events are Plain Old PHP Objects:
In case of Command Handlers there may be only single Handler for given Command Class. This is not a case for Event Handlers, multiple Event Handler may subscribe to same Event Class.
Publishing Events
To publish Events, we will be using EventBus.
EventBus is available in your Dependency Container by default, just like Command and Query buses.
You may use Ecotone's invocation control, to inject Event Bus directly into your Command Handler:
You may inject any other Service available in your Dependency Container, into your Message Handler methods.
Multiple Subscriptions
Unlike Command Handlers which points to specific Command Handler, Event Handlers can have multiple subscribing Event Handlers.
Each Event Handler can be defined as . If multiple Event Handlers are marked for asynchronous processing, each of them is handled in isolation. This ensures that in case of failure, we can safely retry, as only failed Event Handler will be performed again.
Subscribe to Interface or Abstract Class
If your Event Handler is interested in all Events around specific business concept, you may subscribe to Interface or Abstract Class.
And then instead of subscribing to TicketWasCreated or TicketWasCancelled, we will subscribe to TicketEvent.
Subscribing by Union Classes
We can also subscribe to different Events using union type hint. This way we can ensure that only given set of events will be delivered to our Event Handler.
Subscribing to All Events
We may subscribe to all Events published within the application. To do it we type hint for generic object.
Subscribing to Events by Routing
Events can also be subscribed by Routing.
And then Event is published with routing key
Ecotone is using message routing for . This way applications can stay decoupled from each other, as there is no need to share the classes between them.
Subscribing to Events by Routing and Class Name
There may be situations when we will want to subscribe given method to either routing or class name.
Ecotone those subscriptions separately to protect from unnecessary wiring, therefore to handle this case, we can simply add another Event Handler which is not based on routing key.
This way we explicitly state that we want to subscribe by class name and by routing key.
Sending Events with Metadata
Just like with Command Bus, we may pass metadata to the Event Bus:
If you make your Event Handler , Ecotone will ensure your metadata will be serialized and deserialized correctly.
Metadata Propagation
By default Ecotone will ensure that your Metadata is propagated.
This way you can simplify your code by avoiding passing around Headers and access them only in places where it matters for your business logic.
To better understand that, let's consider example in which we pass the metadata to the Command.
However in order to perform closing ticket logic, information about the executorId is not needed, so we don't access that.
However Ecotone will ensure that your metadata is propagated from Handler to Handler.
This means that the context is preserved and you will be able to access executorId in your Event Handler.
Scaling and Advanced
PHP Event Sourcing Projection Scaling
The Problem
Your projection processes events for 100,000 aggregates through a single global stream and it can't keep up. Or you need to consume events from Kafka instead of the database event store. How do you scale projections horizontally?
The features described on this page are available as part of Ecotone Enterprise.
namespace App\Domain\Product;
class ChangePriceCommand
{
private int $productId;
private Cost $cost;
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : Cost
{
return $this->cost;
}
}
#[Aggregate]
class Product
{
use WithAggregateEvents;
#[Identifier]
private int $productId;
private Cost $cost;
private int $userId;
private function __construct(int $productId, Cost $cost, int $userId)
{
$this->productId = $productId;
$this->cost = $cost;
$this->userId = $userId;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command, array $metadata) : self
{
return new self(
$command->getProductId(),
$command->getCost(),
// all metadata is available for us.
// Ecotone automatically inject it, if second param is array
$metadata["userId"]
);
}
#[CommandHandler("product.changePrice")]
public function changePrice(ChangePriceCommand $command, array $metadata) : void
{
if ($metadata["userId"] !== $this->userId) {
throw new \InvalidArgumentException("You are not allowed to change the cost of this product");
}
$this->cost = $command->getCost();
}
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
InvalidArgumentException
You are not allowed to change the cost of this product
namespace App\Domain\Product;
class UserService
{
public function isAdmin(int $userId) : bool
{
return $userId === 1;
}
}
#[CommandHandler("product.register")]
public static function register(
RegisterProductCommand $command,
array $metadata,
// Any non first class argument, will be considered an DI Service to inject
UserService $userService
) : self
{
$userId = $metadata["userId"];
if (!$userService->isAdmin($userId)) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
return new self($command->getProductId(), $command->getCost(), $userId);
}
#[CommandHandler("product.register")]
public static function register(
#[Payload] RegisterProductCommand $command,
// injecting specific header and doing the conversion string to UserId
#[Header("userId")] UserId $metadata,
#[Reference] UserService $userService
) : self
{
// ...
}
class TicketService
{
#[CommandHandler]
public function createTicket(CreateTicketCommand $command) : void
{
// handle create ticket command
}
}
#[ClassReference("ticketService")]
class TicketService
class readonly CreateTicketCommand
{
public function __construct(
public string $priority,
public string $description
){}
}
class TicketController
{
// Command Bus will be auto registered in Depedency Container.
public function __construct(private CommandBus $commandBus) {}
public function createTicketAction(Request $request) : Response
{
$this->commandBus->send(
new CreateTicketCommand(
$request->get("priority"),
$request->get("description"),
)
);
return new Response();
}
}
$messagingSystem->getCommandBus()->send(
new CreateTicketCommand(
$priority,
$description,
)
);
class TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function closeTicketAction(Request $request, Security $security) : Response
{
$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorId" => $security->getUser()->getId()]
);
}
}
$messagingSystem->getCommandBus()->send(
new CloseTicketCommand($ticketId),
["executorId" => $executorId]
);
class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
// by adding Header attribute we state what metadata we want to fetch
#[Header("executorId")] string $executorId
): void
{
// handle closing ticket with executor from metadata
}
}
class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Reference] AuthorizationService $authorizationService
): void
{
// handle closing ticket with executor from metadata
}
}
class TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function createTicketAction(Request $request) : Response
{
$commandBus->sendWithRouting(
"createTicket",
$request->getContent(),
"application/json" // we tell what format is used in the request content
);
return new Response();
}
}
class TicketService
{
// Ecotone will do deserialization for the Command
#[CommandHandler("createTicket")]
public function createTicket(CreateTicketCommand $command): void
{
// handle creating ticket
}
}
class TicketController
{
private CommandBus $commandBus;
public function __construct(CommandBus $commandBus)
{
$this->commandBus = $commandBus;
}
public function closeTicketAction(Request $request) : Response
{
$commandBus->sendWithRouting(
"closeTicket",
Uuid::fromString($request->get("ticketId"))
);
return new Response();
}
}
class TicketService
{
#[CommandHandler("closeTicket")]
public function closeTicket(UuidInterface $ticketId): void
{
// handle closing ticket
}
}
class PaymentService
{
#[CommandHandler]
public function closeTicket(MakePayment $command): Url
{
// handle making payment
return $paymentUrl;
}
}
$redirectUrl = $this->commandBus->send($command);
public function createTicketAction(Request $request) : Response
{
$ticketId = $this->commandBus->send(
routingKey: 'createTicket',
command: $request->getContent(), // Ecotone will deserialize Command in-fly
commandMediaType: 'application/json',
);
return new Response([
'ticketId' => $ticketId
]);
}
class TicketService
{
#[EventHandler]
public function when(TicketWasCreated $event): void
{
// handle event
}
}
class readonly TicketWasCreated
{
public function __construct(
public string $ticketId
) {}
}
class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publish(new TicketWasCreated($ticketId));
}
}
class TicketService
{
#[EventHandler]
public function when(TicketWasCreated $event): void
{
// handle event
}
}
class NotificationService
{
#[EventHandler]
public function sendNotificationX(TicketWasCreated $event): void
{
// handle event
}
#[EventHandler]
public function sendNotificationY(TicketWasCreated $event): void
{
// handle event
}
}
interface TicketEvent
{
}
class readonly TicketWasCreated implements TicketEvent
{
public function __construct(
public string $ticketId
) {}
}
class readonly TicketWasCancelled implements TicketEvent
{
public function __construct(
public string $ticketId
) {}
}
#[EventHandler]
public function notify(TicketEvent $event) : void
{
// do something with $event
}
#[EventHandler]
public function notify(TicketWasCreated|TicketWasCancelled $event) : void
{
// do something with $event
}
#[EventHandler]
public function log(object $event) : void
{
// do something with $event
}
class TicketService
{
#[EventHandler("ticket.was_created")]
public function when(TicketWasCreated $event): void
{
// handle event
}
}
class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publishWithRouting(
"ticket.was_created",
new TicketWasCreated($ticketId)
);
}
}
class TicketService
{
#[EventHandler]
#[EventHandler("ticket.was_created")]
public function when(TicketWasCreated $event): void
{
// handle event
}
}
class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publish(
new TicketWasCreated($ticketId),
metadata: [
"executorId" => $command->executorId()
]
);
}
}
class TicketService
{
#[EventHandler]
public function when(
TicketWasCreated $event,
// access metadata with given name
#[Header("executorId")] string $executorId
): void
{
// handle event
}
}
class TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function closeTicketAction(Request $request, Security $security) : Response
{
$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorId" => $security->getUser()->getId()]
);
}
}
$messagingSystem->getCommandBus()->send(
new CloseTicketCommand($ticketId),
["executorId" => $executorId]
);
class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
EventBus $eventBus
)
{
// close the ticket
// we simply publishing an Event, we don't pass any metadata here
$eventBus->publish(new TicketWasCreated($ticketId));
}
}
class AuditService
{
#[EventHandler]
public function log(
TicketWasCreated $event,
// access metadata with given name
#[Header("executorId")] string $executorId
): void
{
// handle event
}
}
Comparing Projection Types
Global
Partitioned
Streaming
Event source
Database Event Store
Database Event Store
Message Broker (Kafka, RabbitMQ)
Position tracking
Single global position
For production systems with growing event volumes, partitioned projections are the recommended choice. They are faster (indexed event loading), more resilient (failure isolation per aggregate), and scale horizontally (parallel workers).
Transactional Scope: Why Global Projections Can't Scale
To understand why partitioned projections are necessary for scaling, it helps to see how the transactional scope differs between the two types.
Globally tracked projections have a single position tracker for the entire projection. When one process is projecting events, it holds a lock on that position. Any other process that wants to project must wait until the first one finishes and releases the lock. This is by design — the global stream must be processed in order, so only one consumer can advance the position at a time.
This means globally tracked projections are not scalable by nature. Adding more workers doesn't help — they queue up behind each other. Global projections are designed for building read models that need to aggregate data across the entire stream (e.g., a dashboard counting all tickets regardless of which aggregate produced them).
Partitioned projections have a separate position tracker per aggregate. The transactional scope is per projected aggregate, not per projection. This means Ticket-A and Ticket-B can project at the same time without blocking each other — each holds a lock only on its own partition state.
This is why partitioned projections are scalable by nature — adding more workers directly increases throughput.
In most cases, what you want to project is the state of a given aggregate — for this, partitioned projections are the right choice. Global projections are meant for the less common case where you need to build a read model across the entire stream (e.g., cross-aggregate reporting).
Migrating from Global to Partitioned
The upgrade path from a global projection to a partitioned one is simple:
Deploy a second version of your projection with #[Partitioned] alongside the existing global one
Both projections are backed by the same Event Store — no data migration needed
Ecotone takes care of delivery and execution
You just choose the execution model (sync or async)
You can use Blue-Green Deployments to make this transition with zero downtime — the old global projection continues serving traffic while the new partitioned one catches up.
Partitioned Projections
A partitioned projection creates one partition per aggregate. Each partition tracks its own position, processes its own events, and can fail independently.
The Difference in Practice: Sync and Async
This transactional scope difference affects both execution modes.
Synchronous example: Two users register tickets at the same time. With a global projection, one request must wait for the other's projection to finish before it can project — adding latency to the API response. With a partitioned projection, both requests project their own aggregate independently and return immediately.
Asynchronous example: With a global projection, it only makes sense to have a single worker running per projection — adding more workers doesn't help because they block each other waiting for the single position lock. With partitioned projections, each worker picks up a different aggregate's events. If you have 4 workers processing 4 different aggregates in parallel, throughput scales 4x.
Performance: Why Partitioned Is Faster
Beyond resilience and scalability, partitioned projections have a significant performance advantage in event loading.
A globally tracked projection must scan the entire event stream — even events it doesn't care about — because it tracks a single position across all aggregates. It cannot skip events, because skipping would create gaps that need to be tracked and resolved. Even if your projection only handles TicketWasRegistered, it still reads past millions of OrderWasPlaced events to advance its position and maintain gap awareness.
A partitioned projection tracks position per aggregate. Because event ordering within a single aggregate is guaranteed by the Event Store's optimistic locking (no gaps possible), Ecotone can skip directly to the events the projection is interested in — filtering by aggregate type at the database level using indexes. There is no need to read irrelevant events. On a high-volume event stream with millions of events across many aggregate types, this makes a massive difference in loading speed.
Streaming Projections
Streaming projections consume events from a message channel (such as Kafka or RabbitMQ Streams) instead of reading from the database event store directly:
When to use:
Cross-system integration — events produced by other services via Kafka or RabbitMQ
When you want to decouple event reading from the database Event Store
Real-time event consumption from external sources
Streaming projections don't need #[FromAggregateStream] — events come from the message channel directly.
Feeding a Streaming Channel from the Event Store
You don't need an external message broker to use streaming projections. Ecotone provides an Event Store Adapter that reads events from your database Event Store and forwards them to a streaming channel. This creates a bridge between the Event Store and the streaming projection:
Configure the adapter using EventStreamingChannelAdapter:
This creates a polling endpoint (product_stream_feeder) that continuously reads events from the product_stream in the Event Store and forwards them to the product_stream_channel streaming channel.
Then your streaming projection consumes from that channel:
Run both the feeder and the projection:
# Start the Event Store feeder (reads events, forwards to channel)
bin/console ecotone:run product_stream_feeder -vvv
# Start the streaming projection (consumes from channel)
bin/console ecotone:run product_catalog -vvv
You can filter which events the adapter forwards using glob patterns:
Only events matching the patterns will be forwarded to the channel. Events that don't match are skipped.
The Event Store Adapter is useful when you want streaming projection benefits (channel-based consumption, broker-level parallelism) but your events live in the database Event Store. It bridges the two worlds without requiring an external message broker.
Polling Projections
Polling projections run as a dedicated background process that periodically queries the event store:
When to use:
Heavy projections that need an isolated process
Projections that should run independently of the event-driven flow
Run the poller:
bin/console ecotone:run analytics_poller -vvv
artisan ecotone:run analytics_poller -vvv
$messagingSystem->run('analytics_poller');
Custom Extensions
For advanced use cases, you can provide custom implementations of the projection infrastructure:
#[StreamSource] — custom event source (alternative to the built-in Event Store reader)
#[StateStorage] — custom state persistence (alternative to the built-in DBAL storage)
#[PartitionProvider] — custom partition strategy (alternative to aggregate-based partitioning)
These are useful when integrating with non-standard event stores or storage backends.
Multi-Tenant Projections
Ecotone supports projections in multi-tenant environments where each tenant has its own database connection:
Events are isolated per tenant, and projections process each tenant's events against their own database. The tenant is identified via message metadata.
#[ProjectionV2('ticket_details')]
#[FromAggregateStream(Ticket::class)]
#[Partitioned]
class TicketDetailsProjection
{
public function __construct(private Connection $connection) {}
#[EventHandler]
public function onTicketRegistered(TicketWasRegistered $event): void
{
$this->connection->insert('ticket_details', [
'ticket_id' => $event->ticketId,
'type' => $event->type,
'status' => 'open',
]);
}
#[EventHandler]
public function onTicketClosed(TicketWasClosed $event): void
{
$this->connection->update(
'ticket_details',
['status' => 'closed'],
['ticket_id' => $event->ticketId]
);
}
#[ProjectionInitialization]
public function init(): void { /* CREATE TABLE */ }
#[ProjectionReset]
public function reset(): void { /* DELETE FROM */ }
}
#[ProjectionV2('external_orders')]
#[Streaming('orders_channel')]
class ExternalOrdersProjection
{
#[EventHandler]
public function onOrderReceived(OrderReceived $event): void
{
// Process events coming from the streaming channel
}
}
#[ServiceContext]
public function eventStoreFeeder(): EventStreamingChannelAdapter
{
return EventStreamingChannelAdapter::create(
streamChannelName: 'product_stream_channel',
endpointId: 'product_stream_feeder',
fromStream: 'product_stream',
);
}
#[ProjectionV2('product_catalog')]
#[Streaming('product_stream_channel')]
class ProductCatalogProjection
{
#[EventHandler]
public function onProductRegistered(ProductRegistered $event): void
{
// Process events forwarded from the Event Store
}
}
#[ProjectionV2('heavy_analytics')]
#[FromAggregateStream(Order::class)]
#[Polling('analytics_poller')]
class HeavyAnalyticsProjection
{
#[EventHandler]
public function onOrderPlaced(OrderWasPlaced $event): void
{
// Heavy processing — runs in dedicated process
}
}
Message Driven System with Domain Driven Design principles in PHP
Works with: Laravel, Symfony, and Standalone PHP
What Ecotone Gives You
Ecotone is a messaging framework that brings enterprise architecture patterns to PHP. It provides the infrastructure for
CQRS
,
Event Sourcing
,
Sagas
,
Distributed Messaging
, and
Production Resilience
— so you write business logic, not boilerplate.
Everything in Ecotone is built around Messages. Commands express intentions ("place this order"), Events express facts ("order was placed"), and Queries express questions ("what are this user's orders?"). This isn't just a naming convention — it's the architectural foundation that enables async processing, resilience, workflows, and distributed systems.
Built on Enterprise Integration Patterns
Ecotone is built on Enterprise Integration Patterns — the same foundation that powers Spring Integration (Java), NServiceBus (.NET), and Apache Camel. Communication between objects happens through Message Channels — pipes where one side sends messages and the other consumes them.
Communication between Objects using Messages
Because communication goes through Message Channels, switching from synchronous to asynchronous, or from one message broker to another, doesn't affect your business code. You change the channel configuration — your handlers stay the same.
Application level code
Ecotone provides different levels of abstractions, which we can choose to use from. Each abstraction is described in more details in related sections. In this Introduction section we will go over high level on how things can be used, to show what is Message based communication about.
Command Handlers
Let's discuss our example from the above screenshot, where we want to register User and trigger Notification Sender. In Ecotone flows, we would introduce Command Handler being responsible for user registration:
As you can see, we also inject Event Bus which will publish our Event Message of User Was Registered.
Command and Events are the sole of higher level Messaging. On the low level everything is Message, yet each Message can either be understood as Command (intention to do), or Event (fact that happened). This make the clear distinction between - what we want to happen vs what actually had happened.
In our Controller we would inject Command Bus to send the Command Message:
After sending Command Message, our Command Handler will be executed. Command and Event Bus are available in our Dependency Container after Ecotone installation out of the box.
What is important here is that, Ecotone never forces us to implement or extend Framework specific classes. This means that our Command or Event Messages are POPO (clean PHP objects). In most of the scenarios we will simply mark given method with Attribute and Ecotone will glue the things for us.
We mentioned Notification Sender to be executed when User Was Registered Event happens.
For this we follow same convention of using Attributes:
This Event Handler will be automatically triggered when related Event will be published.
This way we can easily build decoupled flows, which hook in into existing business events.
Even so Commands and Events are Messages at the fundamental level, Ecotone distinguish them because they carry different semantics. By design Commands can only have single related Command Handler, yet Events can have multiple subscribing Event Handlers.
This makes it easy for Developers to reason about the system and making it much easier to follow, as the difference between Messages is built in into the architecture itself.
From here we could decide to make use Message routing functionality to decouple Controllers from constructing Command Messages.
with this in mind, we can now user CommandBus with routing and even let Ecotone deserialize the Command, so our Controller does not even need to be aware of transformations:
When controllers simply pass through incoming data to Command Bus via routing, there is not much logic left in controllers. We could even have single controller, if we would be able to get routing key. It's really up to us, what work best in context of our system.
What we could decide to do is to add so called Interceptors (middlewares) to our Command Bus to add additional data or perform validation or access checks.
Pointcut provides the point which this interceptor should trigger on. In above scenario it will trigger when Command Bus is triggered before Message is send to given Command Handler.
The reference attribute stays that given parameter is Service from Dependency Container and Ecotone should inject it.
There are multiple different interceptors that can hook at different moments of the flow.
We could hook before Message is sent to Asynchronous Channel, or before executing Message Handler.
We could also state that we want to hook for all Command Handlers or Event Handlers.
And in each step we can decide what we want to do, like modify the messages, stop the flow, enforce security checks.
When we send Command using Command Bus, Ecotone under the hood construct a Message.
Message contains of two things - payload and metadata.
Payload is our Command and metadata is any additional information we would like to carry.
Metadata can be easily then accessed from our Command Handler or Interceptors
Besides metadata that we do provide, Ecotone provides additional metadata that we can use whenever needed, like Message Id, Correlation Id, Timestamp etc.
Ecotone take care of automatic Metadata propagation, no matter if execution synchronous or asynchronous. Therefore we can easily access any given metadata in targeted Message Handler, and also in any sub-flows like Event Handlers. This make it really easy to carry any additional information, which can not only be used in first executed Message Handler, but also in any flow triggered as a result of that.
As we mentioned at the beginning of this introduction, communication happen through Message Channels, and thanks to that it's really easy to switch code from synchronous to asynchronous execution. For that we would simply state that given Message Handler should be executed asynchronously:
Now before this Event Handler will be executed, it will land in Asynchronous Message Channel named "async" first, and from there it will be consumed asynchronously by Message Consumer (Worker process).
There maybe situations where multiple Asynchronous Event Handlers will be subscribing to same Event.
We can easily imagine that one of them may fail and things like retries become problematic (As they may trigger successful Event Handlers for the second time).
That's why Ecotone introduces Message Handling isolation, which deliver a copy of the Message to each related Event Handler separately. As a result each Asynchronous Event Handler is handling it's own Message in full isolation, and in case of failure only that Handler will be retried.
If we are using Eloquent, Doctrine ORM, or Models with custom storage implementation, we could push our implementation even further and send the Command directly to our Model.
We are marking our model as Aggregate, this is concept from Domain Driven Design, which describe a model that encapsulates the business logic.
Ecotone will take care of loading the Aggregate and storing them after the method is called.
Therefore all we need to do it to send an Command.
Like you can see, we also added "block()" method, which will block given user. Yet it does not hold any Command as parameter. In this scenario we don't even need Command Message, because the logic is encapsulated inside nicely, and passing a status from outside could actually allow for bugs (e.g. passing UserStatus::active). Therefore all we want to know is that there is intention to block the user, the rest happens within the method.
To execute our block method we would call Command Bus this way:
There is one special metadata here, which is "aggregate.id", this tell Ecotone the instance of User which it should fetch from storage and execute this method on. There is no need to create Command Class at all, because there is no data we need to pass there.
This way we can build features with ease and protect internal state of our Models, so they are not modified in incorrect way.
One of the powers that Message Driven Architecture brings is ability to build most sophisticated workflows with ease. This is possible thanks, because each Message Handler is considered as Endpoint being able to connect to input and output Channels. This is often referenced as pipe and filters architecture, but in general this is characteristic of true message-driven systems.
Let's suppose that our registered user can apply for credit card, and for this we need to pass his application through series of steps to verify if it's safe to issue credit card for him:
We are using outputChannelName here to indicate where to pass Message after it's handled by our Command Handler. In here we could enrich our CardApplication with some additional data, or create new object. However it's fully fine to pass same object to next step, if there was no need to modify it.
Ecotone provides ability to pass same object between workflow steps. This simplify the flow a lot, as we are not in need to create custom objects just for the framework needs, therefore we stick what is actually needed from business perspective.
Let's define now location where our Message will land after:
We are using here InternalHandler, internal handlers are not connected to any Command or Event Buses, therefore we can use them as part of the workflow steps, which we don't want to expose outside.
It's really up to us whatever we want to define Message Handlers in separate classes or not. In general due to declarative configuration in form of Attributes, we could define the whole flow within single class, e.g. "CardApplicationProcess".
Workflows can also be started from Command or Event Handlers, and also directly through Business Interfaces. This makes it easy to build and connect different flows, and even reuse steps when needed.
Our Internal Handler contains of inputChannelName which points to the same channel as our Command Handlers outputChannelName. This way we bind Message Handlers together to create workflows. As you can see we also added Asynchronous attribute, as process of identity verification can take a bit of time, we would like it to happen in background.
Let's define our last step in Workflow:
This we've made synchronous which is the default if no Asynchronous attribute is defined. Therefore it will be called directly after Identity verification.
Workflows in Ecotone are fully under our control defined in PHP. There is no need to use 3rd party, or to define the flows within XMLs or YAMLs. This makes it really maintainable solution, which we can change, modify and test them easily, as we are fully on the ownership of the process from within the code.
It's worth to mention that workflows are in general stateless as they pass Messages from one pipe to another. However if we would want to introduce statefull Workflow we could do that using Ecotone's Sagas.
Ecotone handles failures at the architecture level to make Application clear of those concerns.
As Messages are the main component of communication between Applications, Modules or even Classes in Ecotone, it creates space for recoverability in all parts of the Application. As Messages can be retried instantly or with delay without blocking other processes from continuing their work.
Message failed and will be retried with delay
As Message are basically data records which carry the intention, it opens possibility to store that "intention", in case unrecoverable failure happen. This means that when there is no point in delayed retries, because we encountered unrecoverable error, then we can move that Message into persistent store. This way we don't lose the information, and when the bug is fixed, we can simply retry that Message to resume the flow from the place it failed.
Storing Message for later review, and replaying when bug is fixed
There are of course more resiliency patterns, that are part of Ecotone, like:
Automatic retries to send Messages to Asynchronous Message Channels
Reconnection of Message Consumers (Workers) if they lose the connection to the Broker
Inbuilt functionalities like Message Outbox, Error Channels with Dead Letter, Deduplication of Messages to avoid double processing,
and many many more.
The flow that Ecotone based on the Messages makes the Application possibile to handle failures at the architecture level. By communicating via Messages we are opening for the way, which allows us to self-heal our application without the need for us intervene, and in case of unrecoverable failures to make system robust enough to not lose any information and quickly recover from the point o failure when the bug is fixed.
Ecotone shifts the focus from technical details to the actual business processes, using Resilient Messaging as the foundation on which everything else is built. It provides seamless communication using Messages between Applications, Modules or even different Classes.
Together with that we will be using Declarative Configuration with attributes to avoid writing and maintaining configuration files. We will be stating intention of what we want to achieve instead wiring things ourselves, as a result we will regain huge amount of time, which can be invested in more important part of the System.
And together with that, we will be able to use higher level Build Blocks like Command, Event Handlers, Aggregates, Sagas which connects to the messaging seamlessly, and helps encapsulate our business logic.
So all the above serves as pillars for creating so called Business Oriented Architecture:
When all thee pillars are solved by Ecotone, what is left to write is Business Oriented Code
Resilient Messaging- At the heart of Ecotone lies a resilient messaging system that enables loose coupling, fault tolerance, and self-healing capabilities.
Declarative Configuration - Introduces declarative programming with Attributes. It simplifies development, reduces boilerplate code, and promotes code readability. It empowers developers to express their intent clearly, resulting in more maintainable and expressive codebases.
Building Blocks - Building blocks like Message Handlers, Aggregates, Sagas, facilitate the implementation of the business logic. By making it possible to bind Building Blocks with Resilient Messaging, Ecotone makes it easy to build and connect even the most complex business workflows.
Having this foundation knowledge and understanding how Ecotone works on the high level, it's good moment to dive into Tutorial section, which will provide hands on experience to deeper understanding.
Materials
Ecotone blog provides articles which describes Ecotone's architecture and related features in more details. Therefore if you want to find out more, follow bellow links:
class UserService
{
#[CommandHandler]
public function register(RegisterUser $command, EventBus $eventBus): void
{
// store user
$eventBus->publish(new UserWasRegistered($userId));
|
}
public function registerAction(Request $request, CommandBus $commandBus): Response
{
$command = // construct command
$commandBus->send($command);
}
class NotificationSender
{
#[EventHandler]
public function when(UserWasRegistered $event): void
{
// send notification
}
}
#[CommandHandler(routingKey: "user.register")]
public function register(RegisterUser $command, EventBus $eventBus): void
public function registerAction(Request $request, CommandBus $commandBus): Response
{
$commandBus->sendWithRouting(
routingKey: "user.register",
command: $request->getContent(),
commandMediaType: "application/json"
);
}
#[Before(pointcut: CommandBus::class)]
public function validateAccess(
RegisterUser $command,
#[Reference] AuthorizationService $authorizationService
): void
{
if (!$authorizationService->isAdmin) {
throw new AccessDenied();
}
}
#[CommandHandler(routingKey: "user.register")]
public function register(
RegisterUser $command,
EventBus $eventBus,
#[Header("executorId")] string $executorId,
): void
#[Asynchronous("async")]
#[EventHandler]
public function when(UserWasRegistered $event): void
#[Aggregate]
class User
{
use WithEvents;
#[Identifier]
private UserId $userId;
private UserStatus $status;
#[CommandHandler(routingKey: "user.register")]
public static function register(RegisterUser $command): self
{
$user = //create user
$user->recordThat(new UserWasRegistered($userId));
return $user;
}
#[CommandHandler(routingKey: "user.block")]
public function block(): void
{
$this->status = UserStatus::blocked;
}
}
public function blockAction(Request $request, CommandBus $commandBus): Response
{
$commandBus->sendWithRouting(
routingKey: "user.block",
metadata: [
"aggregate.id" => $request->get('userId'),
]
);
}
class CreditCardApplicationProcess
{
#[CommandHandler(
routingKey: "apply_for_card",
outputChannelName: "application.verify_identity"
)]
public function apply(CardApplication $application): CardApplication
{
// store card application
return $application;
}
}
#[Asynchronous("async")]
#[InternalHandler(
inputChannelName: "application.verify_identity",
outputChannelName: "application.send_result"
)]
public function verifyIdentity(CardApplication $application): ApplicationResult
{
// do the verification
return new ApplicationResult($result);
}
#[InternalHandler(
inputChannelName: "application.send_result"
)]
public function sendResult(ApplicationResult $application): void
{
// send result
}
Fetches only relevant events per aggregate (indexed)
Pushed by broker
Parallel processing
Sequential, single consumer
Each partition independent, multiple workers
Broker-level parallelism
Best for
Simple projections, low volume
Production workloads, high volume
Cross-system integration, external events
Licence
Open source
Enterprise
Enterprise
Lesson 3: Converters
PHP Conversion
Not having code for Lesson 3?git checkout lesson-3
Conversion
Command, queries and events are not always objects. When they travel via different asynchronous channels, they are converted to simplified format, like JSON or XML.
At the level of application however we want to deal with PHP format as objects or arrays.
Moving from one format to another requires conversion. Ecotone does provide extension points in which we can integrate different Converters to do this type of conversion.
First Media Type Converter
Let's build our first converter from JSON to our PHP format. In order to do that, we will need to implement Converter interface and mark it with MediaTypeConverter().
TypeDescriptor - Describes type in PHP format. This can be class, scalar (int, string), array etc.
MediaType - Describes Media type format. This can be application/json, application/xml etc.
$source - is the actual data to be converted.
Let's start with implementing matches method. Which tells us, if this converter can do conversion from one type to another.
This will tell Ecotone that in case source media type is JSON and target media type is PHP, then it should use this converter.
Now we can implement the convert method. We will do pretty naive solution, just for the proof the concept.
Normally you would inject into Converter class, some kind of serializer used within your application for example JMS Serializer or Symfony Serializer to make the conversion.
And let's add fromArray method to RegisterProductCommand and GetProductPriceQuery.
Let's run our testing command:
If we call our testing command now, everything is going fine, but we still send PHP objects instead of JSON, therefore there was not need for Conversion.
In order to start sending Commands and Queries in different format, we need to provide our handlers with routing key. This is because we do not deal with Object anymore, therefore we can't do the routing based on them.
You may think of routing key, as a message name used to route the message to specific handler. This is very powerful concept, which allows for high level of decoupling.
Let's change our Testing class, so we call buses with JSON format.
We make use of different method now sendWithRouting.
It takes as first argument routing key to which we want to send the message.
The second argument describes the format of message we send.
Third is the data to send itself, in this case command formatted as JSON.
Let's run our testing command:
Ecotone JMS Converter
Normally we don't want to deal with serialization and deserialization, or we want to make the need for configuration minimal. This is because those are actually time consuming tasks, which are more often than not a copy/paste code, which we need to maintain.
Ecotone comes with integration with to solve this problem.
It introduces a way to write to reuse Converters and write them only, when that's actually needed.
Therefore let's replace our own written Converter with JMS one.
Let's download the Converter using .
composer require ecotone/jms-converter
Let's remove __construct and fromArray methods from RegisterProductCommandGetProductPriceQuery, and the JsonToPHPConverter class completely, as we won't need it anymore.
JMS creates cache to speed up serialization process. In case of problems with running this test command, try to remove your cache.
Let's run our testing command:
Do you wonder, how come, that we just deserialized our Command and Query classes without any additional code?
JMS Module reads properties and deserializes according to type hint or docblock for arrays.
It's pretty straight forward and logical:
Let's imagine we found out, that we have bug in our software. Our system users have registered product with negative price, which in result lowered the bill.
Product should be registered only with positive cost
We could put constraint in Product, validating the Cost amount. But this would assure us only in that place, that this constraint is met. Instead we want to be sure, that the Cost is correct, whenever we make use of it, so we can avoid potential future bugs. This way we will know, that whenever we will deal with Cost object, we will now it's correct.
To achieve that we will create Value Object named Cost that will handle the validation, during the construction.
Great, but where to convert the integer to the Cost class? We really don't want to burden our business logic with conversions. Ecotone JMS does provide extension points, so we can tell him, how to convert specific classes.
Normally you will like to delegate conversion to Converters, as we want to get our domain classes converted as fast as we can. The business logic should stay clean, so it can focus on the domain problems, not technical problems.
Let's create class App\Infrastructure\Converter\CostConverter. We will put it in different namespace, to separate it from the domain.
We mark the methods with Converter attribute, so Ecotone can read parameter type and return type in order to know, how he can convert from scalar/array to specific class and vice versa.
Let's change our command and aggregate class, so it can use the Cost directly.
The $cost class property will be automatically converted from integer to Cost by JMS Module.
Let's run our testing command:
To get more information, read
In this Lesson we learned how to make use of Converters.
The command which we send from outside (to the Command Bus) is still the same, as before. We changed the internals of the domain, without affecting consumers of our API.
In next Lesson we will learn and Method Invocation and Metadata
Great, we just finished Lesson 3!
Interceptors (Middlewares)
PHP Interceptors Middlewares
Ecotone provide possibility to handle cross cutting concerns via Interceptors.
Interceptor intercepts the process of handling the message, this means we can do actions like:
Calling some shared functionality or adding additional behavior\
This all can be done without modifying the code itself, as we hook into the existing flows.
If you are familiar with you will find a lot of similarities.
Interceptor
Before Attribute
Type of Interceptor more about it
Precedence
Precedence defines ordering of called interceptors. The lower the value is, the quicker Interceptor will be called. It's safe to stay with range between -1000 and 1000, as numbers bellow -1000 and higher than 1000 are used by Ecotone.
The precedence is done within a specific .
Pointcut
Every interceptor has Pointcut attribute, which describes for specific interceptor, which endpoints it should intercept.
CLASS_NAME - indicates intercepting specific class or interface or class containing attribute on method or class level
CLASS_NAME::METHOD_NAME - indicates intercepting specific method of class
NAMESPACE*
Interceptor Types
There are four types of interceptors. Each interceptor has it role and possibilities.
Interceptors are called in following order:
Before
Around
After
Before Interceptor
Before Interceptor is called after message is sent to the channel, before execution of Endpoint.
- Exceptional Interceptor
Before interceptor is called before endpoint is executed.
Before interceptors can used in order to stop the flow, throw an exception or enrich the
To understand it better, let's follow an example, where we will intercept Command Handler with verification if executor is an administrator.
Let's start by creating Attribute called RequireAdministrator in new namepace.
Let's create our first Before Interceptor.
We are using in here which is looking for #[RequireAdministrator] annotation in each of registered .
The void return type is expected in here. It tells Ecotonethat, this Before Interceptor is not modifying the Message and message will be passed through. The message flow however can be interrupted by throwing exception.
Now we need to annotate our Command Handler:
Whenever we call our command handler, it will be intercepted by AdminVerificator now.
Our Command Handler is using ChangePriceCommandclass and our AdminVerificator interceptor is using array $payload. They are both referencing payload of the , yet if we define a class as type hint, Ecotone will do the Conversion for us.
- Payload Enriching Interceptor
If return type is not void new Message will be created from the returned type.
Let's follow an example, where we will enrich payload with timestamp.
- Header Enriching Interceptor
Suppose we want to add executor Id, but as this is not part of our Command, we want add it to our Headers.
If return type is not void new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders: true it will tell Ecotone, that we we want to modify Message headers instead of payload.
- Message Filter Interceptor
Use Message Filter, to eliminate undesired messages based on a set of criteria.
This can be done by returning null from interceptor, if the flow should proceed, then payload should be returned.
If return type is not void new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders=trueit will tell Ecotone, that we we want to modify Message headers instead of payload.
Around Interceptor
The Around Interceptor have access to actual Method Invocation.This does allow for starting action before method invocation is done, and finishing it after.
Around interceptoris a good place for handling actions like Database Transactions or logic that need to access invoked object.
As we used Command Bus interface as pointcut, we told Ecotone that it should intercept Command Bus Gateway. Now whenever we will call any method on Command Bus, it will be intercepted with transaction.
The other powerful use case for Around Interceptor is intercepting Aggregate.
Suppose we want to verify, if executing user has access to the Aggregate.
We have placed @IsOwnerOfPerson annotation as the top of class. For interceptor pointcut it means, that each endpoint defined in this class should be intercepted. No need to add it on each Command Handler now.
We've passed the executd Aggregate instance - Person to our Interceptor. This way we can get the context of the executed object in order to perform specific logic.
After Interceptor
After interceptor is called after endpoint execution has finished.
It does work exactly the same as
After interceptor can used to for example to enrich QueryHandler result.
We will intercept all endpoints within Order\ReadModel namespace, by adding result coming from the endpoint under result key.
Presend Interceptor
Presend Interceptor is called before Message is actually send to the channel.
In synchronous channel there is no difference between Before and Presend.
The difference is seen when the channel is .
Presend Interceptor can used to verify if data is correct before sending to asynchronous channel, or we may want to check if user has enough permissions to do given action.
This will keep our asynchronous channel free of incorrect messages.
Presend can't intercept Gateways like (Command/Event/Query) buses, however in context of Gateways using Before Interceptor lead to same behaviour, therefore can be used instead.
<?php
namespace App\Domain\Product;
use Ecotone\Messaging\Attribute\MediaTypeConverter;
use Ecotone\Messaging\Conversion\Converter;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\TypeDescriptor;
#[MediaTypeConverter]
class JsonToPHPConverter implements Converter
{
public function matches(TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType): bool
{
}
public function convert($source, TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType)
{
}
}
public function matches(TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType): bool
{
return $sourceMediaType->isCompatibleWith(MediaType::createApplicationJson()) // if source media type is JSON
&& $targetMediaType->isCompatibleWith(MediaType::createApplicationXPHP()); // and target media type is PHP
}
public function convert($source, TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType)
{
$data = \json_decode($source, true, 512, JSON_THROW_ON_ERROR);
// $targetType hold the class, which we will convert to
switch ($targetType->getTypeHint()) {
case RegisterProductCommand::class: {
return RegisterProductCommand::fromArray($data);
}
case GetProductPriceQuery::class: {
return GetProductPriceQuery::fromArray($data);
}
default: {
throw new \InvalidArgumentException("Unknown conversion type");
}
}
}
class GetProductPriceQuery
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public static function fromArray(array $data) : self
{
return new self($data['productId']);
}
class RegisterProductCommand
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public static function fromArray(array $data) : self
{
return new self($data['productId'], $data['cost']);
}
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!
#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler("product.getCost")]
public function getCost(GetProductPriceQuery $query) : int
{
return $this->cost;
}
namespace App\Domain\Product;
class Cost
{
private int $amount;
public function __construct(int $amount)
{
if ($amount <= 0) {
throw new \InvalidArgumentException("The cost cannot be negative or zero, {$amount} given.");
}
$this->amount = $amount;
}
public function getAmount() : int
{
return $this->amount;
}
public function __toString()
{
return (string)$this->amount;
}
}
namespace App\Infrastructure\Converter;
use App\Domain\Product\Cost;
use Ecotone\Messaging\Attribute\Converter;
class CostConverter
{
#[Converter]
public function convertFrom(Cost $cost) : int
{
return $cost->getAmount();
}
#[Converter]
public function convertTo(int $amount) : Cost
{
return new Cost($amount);
}
}
class RegisterProductCommand
{
private int $productId;
private Cost $cost;
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : Cost
{
return $this->cost;
}
}
class Product
{
use WithAggregateEvents;
#[Identifier]
private int $productId;
private Cost $cost;
private function __construct(int $productId, Cost $cost)
{
$this->productId = $productId;
$this->cost = $cost;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler("product.getCost")]
public function getCost(GetProductPriceQuery $query) : Cost
{
return $this->cost;
}
}
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!
- Indicating all
starting with namespace prefix e.g. App\Domain\*
expression || expression - Indicating one expression or another e.g. Product\*||Order\*
expression && expression - Indicating one expression and another e.g.
App\Domain\* && App\Attribute\RequireAdministrator
Presend
class AdminVerificator
{
#[Before(precedence: 0, pointcut: "Order\Domain\*")]
public function isAdmin(array $payload, array $headers) : void
{
if ($headers["executorId"] != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}
#[\Attribute]
class RequireAdministrator {}
class AdminVerificator
{
#[Before(pointcut: RequireAdministrator::class)]
public function isAdmin(array $payload, array $headers) : void
{
if ($headers["executorId"] != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}
#[CommandHandler]
#[RequireAdministrator] // Our Application level defined Attribute
public function changePrice(ChangePriceCommand $command) : void
{
// do something with $command
}
#[\Attribute]
class AddTimestamp {}
class TimestampService
{
#[Before(pointcut: AddTimestamp::class)]
public function add(array $payload) : array
{
return array_merge($payload, ["timestamp" => time()]);
}
}
class ChangePriceCommand
{
private int $productId;
private int $timestamp;
}
#[CommandHandler]
#[AddTimestamp]
public function changePrice(ChangePriceCommand $command) : void
{
// do something with $command and timestamp
}
#[\Attribute]
class AddExecutor {}
class TimestampService
{
#[Before(pointcut: AddExecutor::class, changeHeaders: true)]
public function add() : array
{
return ["executorId" => 1];
}
}
#[CommandHandler]
#[AddExecutor]
public function changePrice(ChangePriceCommand $command, array $metadata) : void
{
// do something with $command and executor id $metadata["executorId"]
}
#[\Attribute]
class SendNotificationOnlyIfInterested {}
class NotificationFilter
{
#[Before(pointcut: SendNotificationOnlyIfInterested::class, changeHeaders: true)]
public function filter(PriceWasChanged $event) : ?array
{
if ($this->isInterested($event) {
return $event; // flow proceeds
}
return null; // message is eliminated, flow stops.
}
}
#[EventHandler]
#[SendNotificationOnlyIfInterested]
public function sendNewPriceNotification(ChangePriceCommand $event) : void
{
// do something with $event
}
class TransactionInterceptor
{
#[Around(pointcut: Ecotone\Modelling\CommandBus::class)]
public function transactional(MethodInvocation $methodInvocation)
{
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
}catch (\Throwable $exception) {
$this->connection->rollBack();
throw $exception;
}
return $result;
}
}
#[Aggregate]
#[IsOwnedByExecutor]
class Person
{
private string $personId;
#[CommandHandler]
public function changeAddress(ChangeAddress $command) : void
{
// change address
}
public function hasPersonId(string $personId) : bool
{
return $this->personId === $personId;
}
}
#[\Attribute]
class IsOwnedByExecutor {}
class IsOwnerVerificator
{
#[Around(pointcut: IsOwnedByExecutor::class)]
public function isOwner(MethodInvocation $methodInvocation, Person $person, #[Headers] array $metadata)
{
if (!$person->hasPersonId($metadata["executoId"]) {
throw new \InvalidArgumentException("No access to do this action!");
}
return $methodInvocation->proceed();
}
}
namespace Order\ReadModel;
class OrderService
{
#[QueryHandler]
public function getOrderDetails(GetOrderDetailsQuery $query) : array
{
return ["orderId" => $query->getOrderId()]
}
}
class AddResultSet
{
#[After(pointcut: "Order\ReadModel\*")
public function add(array $payload) : array
{
return ["result" => $payload];
}
}
class VerifyIfAuthenticated
{
#[Presend(pointcut: Ecotone\Modelling\Attribute\CommandHandler::class)]
public function verify(#[Header("executorId")] ?string $executorId) : void
{
if (!$executorId) {
throw new \InvalidArgumentException("User must be logged");
}
}
}
class IsEventAlreadyHandled
{
private Storage $storage;
#[Presend(pointcut: Ecotone\Modelling\Attribute\EventHandler::class)]
public function verify($payload, #[Header("messageId")] string $messageId)
{
if ($this->storage->isHandled($messageId)) {
return null;
}
return $payload;
}
}
Ecotone provide us with possibility to handle cross cutting concerns via Interceptors.
Interceptor as name suggest, intercepts the process of handling the message.
You may enrich the , stop or modify usual processing cycle, call some shared functionality, add additional behavior to existing code without modifying the code itself.
If you are familiar with or Middleware pattern you may find some similarities.
Before & After Interceptor
After one of our administrators went for holiday, the others found out, they can't change cost of the product and this become really problematic for them.
Administrators should be able to change the cost of a product
We could copy paste the logic from product.register to product.changePricebut we want to avoid code duplication, especially logic that may happen more often. Let's intercept our Command Handlers.
Let's start by creating Annotation called RequireAdministrator in new namepace App\Infrastructure\RequireAdministrator
Let's create our first Before Interceptor. Start by removing old UserService and create new one in different namespace App\Infrastructure\RequireAdministrator. Remember to mark return type as void, we will see why it is so important soon.
Before- marks method as Interceptor, so it can be be found by Ecotone.
Pointcut - describes what should be intercepted.
CLASS_NAME - indicates what should be intercepted using specific Class Name or Attribute Name annotated at the level of method or class
NAMESPACE* - Indicating all starting with namespace e.g. App\Domain\Product\*
Now we need to annotate our Command Handlers:
We told Before Interceptor that it should intercept all endpoints with annotation RequireAdministrator.
Now, whenever we will call our command handlers, they will be intercepted by UserService.
You can try it out, by providing different userId.
Enrich Message
Before and After interceptors are depending on the return type, to decide if they should modify or pass it through.
If return type is different than void, Message payload or headers can be enriched with data.
If return type is void then message will be passed through and the process of message flow can be interrupted by throwing exception only.
Instead of providing the userId during calling the CommandBus we will enrich with it before it will be handled by Command Handler using Interceptor.
Let's change our testing class to remove metadata and add the Interceptor.
changeHeaders - Tells Ecotone if this Interceptor modifies payload or headers. The default is payload.
If changeHeaders=true thenheaders are picked and associative array must be returned. The returned value is merged with current headers.
If changeHeaders=false then payload is picked and current payload is replaced by returned value, the headers stays the same.
You may of course inject current payload and headers into the method if needed, as with usual endpoint.
&#xNAN;precedence - Tells
Let's annotate Product aggregate
If we annotate aggregate on the class level. Then it does work like each of the method would be annotated with specific annotation in this case @AddUserId.
Let's run our testing command:
Breaking the flow
If during Before or Around you decide to break the flow, return null. Nullindiciates, that there is no message and the current flow ends.
Null can not be returned in header changing interceptor, it does work only for payload changing interceptor.
Around Interceptor
The Around Interceptor is closet to actual endpoint's method call. Thanks to that, it has access to Method Invocation.This does allow for starting some procedure and ending after the invocation is done.
We will add real database to our example using if you do not have extension installed, then you will need to install it first. Yet if you are using Quickstart's Docker container, then you are ready to go.
Let's start by implementing repository, that will be able to handle any aggregate, by storing it in sqlite database.
Before we do that, we need to remove our In Memory implementation class App\Domain\Product\InMemoryProductRepository we will replace it with our new implementation.
We will create using new namespace for it App\Infrastructure\Persistence.
Besides we are going to use , as this is really helpful abstraction over the PDO.
And the :
Connection to sqlite database using dbal library
Serializer is registered by Ecotone.
Serializer can handle serialization using .
It this case it will know how to register Cost class, as we already registered Converter for it.
Serializer give us access for conversion from PHP type to specific Media Type or from specific Media Type
You do not need to focus too much on the Repository implementation, this is just example.
In your application, you may implement it using your ORM or whatever fits you best.
&#xNAN;This implementation will override aggregate for registerProduct, if one already exists. It will will insert or update if aggregate exists.
We want to intercept Command Bus Gateway with transaction. So whenever we call it, it will invoke our Command Handler within transaction.
pointcut="Ecotone\Modelling\CommandBus"
This pointcut will intercept CommandBus.
Let's run our testing command:
We do have two transactions started, because we call the Command Bus twice.
Parameter Converters for Interceptors
Each of interceptors, can inject attribute, which was used for pointcut. Just type hint for it in method declaration.
Around interceptors can inject intercepted class instance. In above example it would be Command Bus.
In case of Command Bus it may seems not needed, but if we would intercept Aggregate, then it really useful as for example you may verify if executing user have access to it.
You may read more about interceptors in .
Great, we have just finished Lesson 5!
Interceptors are very powerful concept. Without extending any classes or interfaces from Ecotone, we can build build up Authorization, Transactions, Delegate duplicated logic, Call some external service, Logging and Tracing before invoking endpoint, the amount of possibilities is endless.
In the next chapter, we will learn about scheduling and polling endpoints
expression||expression - Indicating one expression or another e.g. Product\*||Order\*
Ecotone
in what order interceptors should be called. The lower the value is the quicker interceptor will be called. The order exists within interceptor type:
before/around/after.
We want to call
AddUserId Interceptor
before
RequireAdministrator Interceptor
as it require
userId
to exists, in order to verify.
AddUserIdService
has precedence of
0
as default, so
UserService
must have at least
1
.
to PHP
type. We will use it to easily serialize our
Product
model into
JSON
and store it in database.
This does create database table, if needed. It does create simple table structure containing id of the aggregate, the class type and serialized data in JSON. Take a look at createSharedTableIfNeeded if you want more details.
namespace App\Infrastructure\RequireAdministrator;
#[\Attribute]
class RequireAdministrator {}
namespace App\Infrastructure\RequireAdministrator;
class UserService
{
#[Before(pointcut: RequireAdministrator::class)]
public function isAdmin(#[Header("userId")] ?string $userId) : void
{
if ($userId != 1) {
throw new \InvalidArgumentException("You need to be administrator to perform this action");
}
}
}
pointcut="App\Domain\Product\Product"
use App\Infrastructure\RequireAdministrator\RequireAdministrator;
(...)
#[CommandHandler("product.register")]
#[RequireAdministrator]
public static function register(RegisterProductCommand $command, array $metadata) : self
{
return new self($command->getProductId(), $command->getCost(), $metadata["userId"]);
}
#[CommandHandler("product.changePrice")]
#[RequireAdministrator]
public function changePrice(ChangePriceCommand $command) : void
{
$this->cost = $command->getCost();
}
namespace App\Infrastructure\AddUserId;
#[\Attribute]
class AddUserId {}
namespace App\Infrastructure\AddUserId;
class AddUserIdService
{
#[Before(precedence: 0, pointcut: AddUserId::class, changeHeaders: true)]
public function add() : array
{
return ["userId" => 1];
}
}
class UserService
{
#[Before(precedence: 1,pointcut: RequireAdministrator::class)]
public function isAdmin(#[Header("userId")] ?string $userId) : void
{
if ($userId != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}
use App\Infrastructure\AddUserId\AddUserId;
#[Aggregate]
#[AddUserId]
class Product
{
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
110
Good job, scenario ran with success!
composer require doctrine/dbal
namespace App\Infrastructure\Persistence;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Ecotone\Messaging\Gateway\Converter\Serializer;
use Ecotone\Modelling\Attribute\Repository;
use Ecotone\Modelling\StandardRepository;
#[Repository]
class DbalRepository implements StandardRepository
{
const TABLE_NAME = "aggregate";
const CONNECTION_DSN = 'sqlite:////tmp/db.sqlite';
private Connection $connection; // 1
private Serializer $serializer; // 2
public function __construct(Serializer $serializer)
{
$this->connection = DriverManager::getConnection(array('url' => self::CONNECTION_DSN));
$this->serializer = $serializer;
}
public function canHandle(string $aggregateClassName): bool
{
return true;
}
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
$this->createSharedTableIfNeeded(); // 3
$record = $this->connection->executeQuery(<<<SQL
SELECT * FROM aggregate WHERE id = :id AND class = :class
SQL, ["id" => $this->getFirstId($identifiers), "class" => $aggregateClassName])->fetch(\PDO::FETCH_ASSOC);
if (!$record) {
return null;
}
// 4
return $this->serializer->convertToPHP($record["data"], "application/json", $aggregateClassName);
}
public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void
{
$this->createSharedTableIfNeeded();
$aggregateClass = get_class($aggregate);
// 5
$data = $this->serializer->convertFromPHP($aggregate, "application/json");
if ($this->findBy($aggregateClass, $identifiers)) {
$this->connection->update(self::TABLE_NAME,
["data" => $data],
["id" => $this->getFirstId($identifiers), "class" => $aggregateClass]
);
return;
}
$this->connection->insert(self::TABLE_NAME, [
"id" => $this->getFirstId($identifiers),
"class" => $aggregateClass,
"data" => $data
]);
}
private function createSharedTableIfNeeded(): void
{
$hasTable = $this->connection->executeQuery(<<<SQL
SELECT name FROM sqlite_master WHERE name=:tableName
SQL, ["tableName" => self::TABLE_NAME])->fetchColumn();
if (!$hasTable) {
$this->connection->executeStatement(<<<SQL
CREATE TABLE aggregate (
id VARCHAR(255),
class VARCHAR(255),
data TEXT,
PRIMARY KEY (id, class)
)
SQL
);
}
}
/**
* @param array $identifiers
* @return mixed
*/
private function getFirstId(array $identifiers)
{
return array_values($identifiers)[0];
}
}
namespace App\Infrastructure\Persistence;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Ecotone\Messaging\Attribute\Interceptor\Around;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Modelling\CommandBus;
class TransactionInterceptor
{
private Connection $connection;
public function __construct()
{
$this->connection = DriverManager::getConnection(array('url' => DbalRepository::CONNECTION_DSN));
}
#[Around(pointcut: CommandBus::class)]
public function transactional(MethodInvocation $methodInvocation)
{
echo "Start transaction\n";
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
echo "Commit transaction\n";
}catch (\Throwable $exception) {
$this->connection->rollBack();
echo "Rollback transaction\n";
throw $exception;
}
return $result;
}
}
bin/console ecotone:quickstart
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Commit transaction
110
Good job, scenario ran with success!
Lesson 1: Messaging Concepts
PHP Messages
Not having code for Lesson 1?
git checkout lesson-1
Key concepts / background\
Ecotone from the ground is built around messaging to provide a simple model that allows to connects components, modules or even different Applications together, in seamless and easy way.
To achieve that fundamental messaging blocks are implemented using On top of what we get support for higher level patterns like CQRS, Events, DDD - which help us build systems that make the business logic explicit and maintainable, even in the long term.
In this first lesson, we will learn fundamental blocks in messaging architecture and we will start building back-end for Shopping System using CQRS.
Before we will dive into implementation, let's briefly understand main concepts behind Ecotone.
Message
A Message is a data record containing of Payload and Message Headers (Metadata).
The Payload can be of any PHP type (scalar, object, compound), and the Headers hold commonly required information such as ID, timestamp and framework specific information.
Developers can also store any arbitrary key-value pairs in the headers, to pass additional meta information.
Message Channel
Message channel abstracts communication between components. It does allow for sending and receiving messages. This decouples components from knowledge about the transport layer, as it's encapsulated within the Message Channel.
Message Endpoint
Message Endpoints are consumers and producers of messages. Consumer are not necessary asynchronous, as you may build synchronous flow, compound of multiple endpoints.
If you are familiar with Symfony Messager/Simplebus, for now you can think of Endpoint as a Message Handler, that can be connected to asynchronous or synchronous transport.
Messaging Gateway
The Messaging Gateway encapsulates messaging-specific code (The code required to send or receive a ) and separates it from the rest of the application code.
It take your domain specific objects an convert them into a that is send via .
To not have dependency on the Messaging Framework Ecotone provides the Gateway as interface and generates proxy class for it.
Command/Query/Event buses are implemented using Messaging Gateway.
Business Logic
You will not have to implement Messages, Message Channels or Message Endpoints directly, as those are lower level concepts. Instead you will be able to focus on your specific domain logic with an implementation based on plain PHP objects. By providing declarative configuration we will be able to connect domain-specific code to the messaging system.
Great, now when we know fundamental blocks of Ecotone and Messaging Architecture, we can start implementing our Shopping System!
If you did not understand something, do not worry, we will see how does it apply in practice in next step.
To The Code!
Do you remember this command from ?
If yes and this command does return above output, then we are ready to go.
this method will be run, whenever we executeecotone:quickstart.
This class is auto-registered using auto-wire system, both and provides this great feature. For Lite clean and easy to use is taken.\
Thanks to that, we will avoid writing configuration files for service registrations during this tutorial.
And we will be able to fully focus on what can Ecotone provides to us.
Command Handler - Endpoint
We will start by creating Command Handler.
Command Handler is place where we will put our business logic.
Let's create namespace App\Domain\Product and inside RegisterProductCommand, command for registering new product:
Describing types, will help us in later lessons with automatic conversion. Just remember right now, that it's worth to keep the types defined.
Let's register a Command Handler now by creating class App\Domain\Product\ProductService
First thing worth noticing is #[CommandHandler].
This marks our register method in ProductService as an , from that moment it can be found by Ecotone.
Ecotone will read method declaration and base on the first parameter type hint will know that this CommandHandler is responsible for handling RegisterProductCommand.
Ecotone make use to provide declarative configuration. In most of the scenarios we will be stating "what" we want to achieve with Attributes, and Ecotone will take care of "how". This way our application logic will stay decoupled from the technical concerns.
#[ClassReference] is a special it informs Ecotonehow this service is registered in Depedency Container. As a default it takes the class name, which is compatible with auto-wiring system.
If ProductService would be registered in Dependency Container as "productService", we would use the Attribute this way:
Query Handler - Endpoint
We also need the possibility to query ProductService for registered products and this is the role of Query Handlers. Let's starts with GetProductPriceQueryclass. This query will tell us what is the price of specific product.
We also need Handler for this query. Let's add Query Handler to the ProductService
Some CQRS frameworks expects Handlers be defined as a class, not method. This is somehow limiting and producing a lot of boilerplate. Ecotone does allow for full flexibility, if you want to have only one handler per class, so be it, otherwise just annotate next methods.
Command and Query Bus - Gateways
It's time to call our Endpoints. You may remember that need to be connected using and we did not do anything like this yet. Thankfully Ecotone does create synchronous channels for us, therefore we don't need to bother about it.
Synchronous channels are created automatically for our Message Handlers.
We will learn easily can they be replaced with asynchronous channels in next lessons.
We need to create and send it to correct .
In order to send Message we will use .
Message Gateways are responsible for creating Message from given parameters and send them to the correct channel.
Special types of Gateways are Command and Query Buses:
- For sending Commands we will use Command Bus.
- For sending Queries we will use Query Bus.
Let's inject and call Query and Command bus into EcotoneQuickstart class.
Gateways are auto registered in Dependency Container and available for auto-wire.
Ecotone comes with few Gateways out of the box like Command and Query buses.
We are sending command RegisterProductCommand to the CommandHandler we registered before.
Same as above, but in that case we are sending query GetProductPriceQuery
As you can see we have not defined any Message Channels, Messages or Gateways, yet they all being used in this scenario. This is can happen because Ecotone is using high level abstractions so our daily development is focused on the business side of the code, yet under the hood is using powerful Messaging capabilities.
If you run our testing command now, you should see the result.
Event Handling
We want to notify, when new product is registered in the system.
In order to do it, we will make use of Event Bus Gateway which can publish events.
Let's start by creating ProductWasRegisteredEvent.
As you can see Ecotone does not really care what class Command/Query/Event is. It does not require to implement any interfaces neither prefix or suffix the class name.
In fact commands, queries and events can be of any type and we will see it in next Lessons.
In the tutorial however we use Command/Query/Event suffixes to clarify the distinction.
Let's inject EventBus into our CommandHandler in order to publish ProductWasRegisteredEvent after product was registered.
Ecotone does control method invocation for , if you have type hinted for specific class, framework will look in Dependency Container for specific service in order to inject it automatically.
In this scenario it injects for us Event Bus. If you want to know more, check chapter about .
Now, when our event is published, whenever new product is registered, we want to subscribe to that Event and send notification. Let's create new class and annotate method with EventHandler.
EventHandler tells Ecotone to handle specific event based on declaration type hint, just like with CommandHandler.
Commands are targeting single Handler, Events on other hand can have multiple Handlers subscribing to it.
If you run our testing command now, you should see the result.
Great, we have just finished Lesson 1.
In this lesson we have learned basic of Messaging and CQRS.
That was the longest lesson, as it had to introduce new concepts. Incoming lessons will be much shorter :)
We are ready for Lesson 2!
Lesson 6: Asynchronous Handling
Asynchronous PHP Workers
Not having code for Lesson 6?git checkout lesson-6
Ecotone provides abstractions for asynchronous execution.
Asynchronous
We got new requirement:
User should be able to place order for different products.
We will need to build Order aggregate.
Let's start by creating PlaceOrderCommand with ordered product Ids
We will need OrderedProduct value object, which will describe, cost and identifier of ordered product
And our Order aggregate
placeOrder - Place order method make use of QueryBus to retrieve cost of each ordered product.
You could find out, that we are not using application/json for product.getCost query, ecotone/jms-converter can handle array transformation, so we do not need to use json.
You could inject service into placeOrder that will hide QueryBus implementation from the domain, or you may get this data from data store directly. We do not want to complicate the solution now, so we will use QueryBus directly.
We do not need to change or add new Repository, as our exiting one can handle any new aggregate arriving in our system.
Let's change our testing class and run it!
We want to be sure, that we do not lose any order, so we will register our order.place Command Handler to run asynchronously using RabbitMQ now.
Let's start by adding extension to Ecotone, that can handle RabbitMQ:
We also need to add our ConnectionFactory to our Dependency Container.
We register our AmqpConnectionFactory under the class name Enqueue\AmqpLib\AmqpConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.
Let's add our first AMQP Backed Channel (RabbitMQ Channel), in order to do it, we need to create our first Application Context.
Application Context is a non-constructor class, responsible for extending Ecotone with extra configurations, that will help the framework act in a specific way. In here we want to tell Ecotone about AMQP Channel with specific name.
Let's create new class App\Infrastructure\MessagingConfiguration.
ServiceContext - Tell that this method returns configuration. It can return array of objects or a single object.
Now we need to tell our order.place Command Handler, that it should run asynchronously using our neworders channel.
We do it by adding Asynchronous annotation with channelName used for asynchronous endpoint.
Endpoints using Asynchronous are required to have endpointId defined, the name can be anything as long as it's not the same as routing key (order.place).
You may mark as asynchronous the same way.
Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list
We have new asynchronous endpoint available orders. Name comes from the message channel name.
You may wonder why it is not place_order_endpoint, it's because via single asynchronous channel we can handle multiple endpoints, if needed. This is further explained in .
Let's change orderId in our testing command, so we can place new order.
After running our testing command bin/console ecotone:quickstartwe should get an exception:
That's fine, we have registered order.place Command Handler to run asynchronously, so we need to run our asynchronous endpoint in order to handle Command Message. If you did not received and exception, it's probably because orderId was not changed and we already registered such order.
Let's run our asynchronous endpoint
Like we can see, it ran our Command Handler and placed the order.
We can change our testing command to run only Query Handlerand check, if the order really exists now.
There is one thing we can change.
As in asynchronous scenario we may not have access to the context of executor to enrich the message,, we can change our AddUserIdService Interceptor to perform the action before sending it to asynchronous channel.
This Interceptor is registered as Before Interceptor which is before execution of our Command Handler, but what we want to achieve is, to call this interceptor before message will be send to the asynchronous channel. For this there is Presend Interceptor available.
Change Before annotation to Presend annotation and we are done.
Ecotone will do it best to handle serialization and deserialization of your headers.
Now if non-administrator will try to execute this, exception will be thrown, before the Message will be put to the asynchronous channel. Thanks to Presend interceptor, we can validate messages, before they will go asynchronous, to prevent sending incorrect messages.
The final code is available as lesson-7:
git checkout lesson-7
We made it through, Congratulations!
We have successfully registered asynchronous Command Handler and safely placed the order.
We have finished last lesson. You may now apply the knowledge in real project or check more advanced usages starting here .
namespace App\Domain\Order;
class PlaceOrderCommand
{
private int $orderId;
/**
* @var int[]
*/
private array $productIds;
/**
* @return int[]
*/
public function getProductIds(): array
{
return $this->productIds;
}
public function getOrderId() : int
{
return $this->orderId;
}
}
namespace App\Domain\Order;
class OrderedProduct
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public function getCost(): int
{
return $this->cost;
}
}
namespace App\Domain\Order;
use App\Infrastructure\AddUserId\AddUserId;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\QueryBus;
#[Aggregate]
#[AddUserId]
class Order
{
#[AggregateIdentifier]
private int $orderId;
private int $buyerId;
/**
* @var OrderedProduct[]
*/
private array $orderedProducts;
private function __construct(int $orderId, int $buyerId, array $orderedProducts)
{
$this->orderId = $orderId;
$this->buyerId = $buyerId;
$this->orderedProducts = $orderedProducts;
}
#[CommandHandler("order.place")]
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}
#[QueryHandler("order.getTotalPrice")]
public function getTotalPrice() : int
{
$totalPrice = 0;
foreach ($this->orderedProducts as $orderedProduct) {
$totalPrice += $orderedProduct->getCost();
}
return $totalPrice;
}
}
bin/console ecotone:quickstart
Running example...
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Product with id 2 was registered!
Commit transaction
Start transaction
Commit transaction
400
Good job, scenario ran with success!
composer require ecotone/amqp
# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# You need to have RabbitMQ instance running on your localhost, or change DSN
Enqueue\AmqpExt\AmqpConnectionFactory:
class: Enqueue\AmqpExt\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@localhost:5672//"
# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# docker-compose.yml has RabbitMQ instance defined. It will be working without
# addtional configuration
Enqueue\AmqpExt\AmqpConnectionFactory:
class: Enqueue\AmqpExt\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@rabbitmq:5672//"
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");
});
}
(...)
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//");
});
}
(...)
# Add AmqpConnectionFactory in bin/console.php
// add additional service in container
public function __construct()
{
$this->container = new Container();
$this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//"));
}
# Add AmqpConnectionFactory in bin/console.php
// add additional service in container
public function __construct()
{
$this->container = new Container();
$this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//"));
}
namespace App\Infrastructure;
class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return [
AmqpBackedMessageChannelBuilder::create("orders")
];
}
}
use Ecotone\Messaging\Annotation\Asynchronous;
(...)
#[Asynchronous("orders")]
#[CommandHandler("order.place", endpointId: "place_order_endpoint")]
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}
Go to "src/EcotoneQuickstart.php"
# This class is autoregistered using PHP-DI
<?php
namespace App;
class EcotoneQuickstart
{
public function run() : void
{
echo "Hello World";
}
}
<?php
namespace App\Domain\Product;
class RegisterProductCommand
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : int
{
return $this->cost;
}
}
private int $productId;
<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\CommandHandler;
class ProductService
{
private array $registeredProducts = [];
#[CommandHandler]
public function register(RegisterProductCommand $command) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
}
}
#[ClassReference("productService")
class ProductService
<?php
namespace App\Domain\Product;
class GetProductPriceQuery
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public function getProductId() : int
{
return $this->productId;
}
}
<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
class ProductService
{
private array $registeredProducts = [];
#[CommandHandler]
public function register(RegisterProductCommand $command) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
}
#[QueryHandler]
public function getPrice(GetProductPriceQuery $query) : int
{
return $this->registeredProducts[$query->getProductId()];
}
}
# As default auto wire of Laravel creates new service instance each time
# service is requested from Depedency Container, for our examples
# we want to register ProductService as singleton.
# Go to bootstrap/QuickStartProvider.php and register our ProductService
namespace Bootstrap;
use App\Domain\Product\ProductService;
use Illuminate\Support\ServiceProvider;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(ProductService::class, function(){
return new ProductService();
});
}
(...)
Everything is set up by the framework, please continue...
Everything is set up, please continue...
<?php
namespace App;
use App\Domain\Product\GetProductPriceQuery;
use App\Domain\Product\RegisterProductCommand;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\QueryBus;
class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
// 1
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
// 2
$this->commandBus->send(new RegisterProductCommand(1, 100));
// 3
echo $this->queryBus->send(new GetProductPriceQuery(1));
}
}
bin/console ecotone:quickstart
Running example...
100
Good job, scenario ran with success!
<?php
namespace App\Domain\Product;
class ProductWasRegisteredEvent
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public function getProductId() : int
{
return $this->productId;
}
}
use Ecotone\Modelling\EventBus;
#[CommandHandler]
public function register(RegisterProductCommand $command, EventBus $eventBus) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
$eventBus->publish(new ProductWasRegisteredEvent($command->getProductId()));
}
<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\EventHandler;
class ProductNotifier
{
#[EventHandler] // 1
public function notifyAbout(ProductWasRegisteredEvent $event) : void
{
echo "Product with id {$event->getProductId()} was registered!\n";
}
}
bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!