Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Stop wrestling with complex infrastructure and endless boilerplate. Ecotone brings simplicity back to PHP development by letting you focus on what matters: your business logic.
Write code that reads like your requirements "When a user registers, send a welcome email." That's not pseudocode—it's how you'll actually write it. Your code becomes self-documenting.
Start simple, scale when ready Begin with straightforward, synchronous code. Need async processing later? Add a single attribute. No architecture overhaul required.
Resilience built-in Reliable Workflows, automatic retries, error handling, and self-healing capabilities come standard. Fewer production fires, more peaceful nights.
Framework agnostic You will never be forced to extend or implement any framework related classes. Ecotone seamless integrate with your application using declarative configuration using attributes. And this works beautifully with Symfony, Laravel, or any other framework using Ecotone Lite.
Whether you're building a fresh startup idea or modernizing legacy code, Ecotone grows with you. From simple CRUD to event-sourced microservices, the same familiar patterns guide your way.
Read more in
Feel invited to join , and ask questions there.
Check how to Ecotone for Symfony, Laravel or Lite.
Ecotone works out of the box with popular PHP frameworks like , and can be run stand alone or with any other framework (e.g. Laminas, CodeIgniter, Magento) using .
Domain Driven Design Command Query Responsibility Segregation PHP
Demo Laravel and Symfony Application - You can test Ecotone in real-life example, by using our demo application. The demo application shows how to use Ecotone with Laravel and Symfony frameworks.
Quickstart Examples - Provides great way to check specific Ecotone features. Whether you use Laravel or Symfony or Lite (no external framework), all examples will be able to work in your Application.
Ask question to AI - Ecotone provides AI support, to help you find the answers quicker. You may ask any Ecotone related questions, and it will provide more details on the topic and links where more information can be found.
Have a Workshop or Consultancy - To quickly get whole Team or Organisation up and running with Ecotone, we provide workshops. Workshops will not only teach you how to use Ecotone, but also the concepts and reasoning behind it.
Join Community Channel - Ecotone has a community channel, where you can ask questions, discuss with other users and get help. It is also a great place to share your experiences, and to meet other developers using Ecotone.
Subscribe to Mailing list - Join mailing list to stay up to date with Ecotone changes and latest articles and features.
Some demos and quick-start examples are done using specific framework integration. However Ecotone does not bind given set of features to specific solution. Whether you use Laravel, Symfony or Lite (no external framework), all features will work in the same way. Therefore feel encouraged to test out examples, even if they are not in framework of your choice.

A properly configured PHP project with PSR-4 autoloading
Step 1: Install the Ecotone Symfony Bundle using Composer
composer require ecotone/symfony-bundle
Step 2: Verify Bundle Registration
If you're using Symfony Flex (recommended), the bundle will auto-configure. If auto-configuration didn't work, manually register the bundle in config/bundles.php:
Step 3: Verify Installation
Run this command to check if Ecotone is properly installed:
By default Ecotone will look for Attributes in default Symfony catalog "src". If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
Step 1: Install the Ecotone Laravel Package
composer require ecotone/laravel
Step 2: Verify Provider Registration
The service provider should be automatically registered via Laravel's package discovery. If auto-registration didn't work, manually add the provider to config/app.php:
Step 3: Verify Installation
Run this command to check if Ecotone is properly installed:
By default Ecotone will look for Attributes in default Laravel catalog "app". If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
If you're using no framework or framework different than Symfony or Laravel, then you may use Ecotone Lite to bootstrap Ecotone.
composer require ecotone/ecotone
If you already have Dependency Container configured, then:
By default Ecotone will look for Attributes only in Classes provided under "classesToResolve". If we want to look for Attributes in given set of Namespaces, we can pass it to the configuration.
You may actually run Ecotone without any Dependency Container. That may be useful for small applications, testing or when we want to run some small Ecotone's script.
You may use out of the box Ecotone Lite Application, which provide you with Dependency Container.
composer require ecotone/lite-application
Problem: Ecotone can't find your classes with attributes. Solution: Make sure your classes are in the correct namespace and directory structure matches your PSR-4 autoloading configuration.
Problem: Ecotone commands are not available. Solution:
For Symfony: Check that the bundle is listed in config/bundles.php
For Laravel: Check that the provider is in config/app.php or that package discovery is enabled
Problem: Cache directory is not writable. Solution: Ensure your web server has write permissions to the cache directory (usually var/cache for Symfony or storage/framework/cache for Laravel).
Ecotone\SymfonyBundle\EcotoneSymfonyBundle::class => ['all' => true]php bin/console ecotone:list'providers' => [
\Ecotone\Laravel\EcotoneProvider::class
],php artisan ecotone:list$ecotoneLite = EcotoneLite::bootstrap(
classesToResolve: [User::class, UserRepository::class, UserService::class],
containerOrAvailableServices: $container
);$ecotoneLite = EcotoneLite::bootstrap(
classesToResolve: [User::class, UserRepository::class, UserService::class],
containerOrAvailableServices: $container,
configuration: ServiceConfiguration::createWithDefaults()->withNamespaces(['App'])
);$ecotoneLite = EcotoneLite::bootstrap(
classesToResolve: [User::class, UserRepository::class, UserService::class],
containerOrAvailableServices: [new UserRepository(), new UserService()]
);$ecotoneLite = EcotoneLiteApplication::bootstrap();
$commandBus = $ecotoneLite->getCommandBus();
$queryBus = $ecotoneLite->getQueryBus();PHP Message Bus, CQRS, Command Event Query Handlers
Process Manager Saga PHP
Sagas are part of the Ecotone's Workflow support. To read more refer to .
Starting with Event Sourcing in PHP [Article]
DDD PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading in order to execute the query method.
#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $assignedTo;
#[QueryHandler("ticket.get_assigned_person")]
public function getAssignedTo(): string
{
return $this->assignedTo;
}
}And then we call it from Query Bus:
$this->commandBus->sendWithRouting(
"ticket.get_assigned_person",
// We provide instance of Ticket aggregate using metadata
metadata: ["aggregate.id" => $ticketId]
)You may of course use of Query class or metadata in case of need, which will be passed to your aggregate's method.
In some cases you may want to access Event Store directly.
Event Store is auto registered in your Dependency Container, so you can fetch it like any other service or inject it directly to any Handler.
use Ecotone\EventSourcing\EventStore;
#[QueryHandler(self::GET_CURRENT_BALANCE_QUERY)]
public function getCurrentBalance(#[Reference] EventStore $eventStore): array
{
$streamName = "wallet";
if (!$eventStore->hasStream($streamName)) {
return [];
}
/** @var Event[] $event */
$events = $eventStore->load($streamName, count: 10);
return $events;
}Let's create Event Order was placed.
And Event Handler that will be listening to the OrderWasPlaced.
class OrderWasPlaced
{
private string $orderId;
private string $productName;
public function __construct(string $orderId, string $productName)
{
$this->orderId = $orderId;
$this->productName = $productName;
}
public function getOrderId(): string
{
return $this->orderId;
}
public function getProductName(): string
{
return $this->productName;
}
}Going into CQRS with PHP [Article]
[Article]
Ecotone comes with two plans:
Ecotone Free comes with Apache License Version 2.0 for open features. It allows to build message-driven system in PHP, which solves resiliency and scalability at the architecture level. This covers all the features, which are not marked as Enterprise. \
Ecotone Enterprise is based Enterprise licence. It does provides more advanced set of features aim for Enterprise usage. It does bring to the table custom features, additional integrations, and ability to optimization resource usages.
Each Enterprise feature is marked with hint on the documentation page. Enterprise features can only be run with licence key. To evaluate Enterprise features, email us at "[email protected]" to receive trial key. Production license keys are available at .
- Enables integration with Kafka (Event Streaming Platform) to send, receive from Messages from topics, and to use Kafka in form of Message Channel abstraction for seamless integration into the System.
- Provides ability to set up whole consumption process with single Attribute, and to extend it with resiliency patterns like: instant-retry, dead letter or final failure strategy.
- Provides ability to simplify deployment strategy, adjusting asynchronous processing to business scenarios, and configure processing per Client dynamically (which is especially useful in Multi-Tenant and SAAS environments).
[Article]
Repository PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
Repositories are used for retrieving and saving the aggregate to persistent storage. Typical flow for calling aggregate method would looks like below:
class AssignWorkerHandler
{
private TicketRepository $ticketRepository;
#[CommandHandler]
public function handle(AssignWorkerCommand $command) : void
{
// fetch the aggregate from repository
$ticket = $this->ticketRepository->findBy($command->getTicketId());
// call action method
$ticket->assignWorker($command);
// store the aggregate in repository
$this->ticketRepository->save($ticket);
}
}$this->commandBus->send(
new AssignWorkerCommand(
$ticketId, $workerId,
)
);By setting up Repository we provide Ecotone with functionality to fetch and store the Aggregate , so we don't need to write the above orchestration code anymore.
If our class is defined as Aggregate, Ecotone will use Repository in order fetch and store it, whenever the Command is sent via Command Bus.
Now when we will send the Command, Ecotone will use ticketId from the Command to fetch related Ticket Aggregate, and will called assignWorker passing the Command. After this is completed it will use the repository to store changed Aggregate instance.
Therefore from high level nothing changes:
This way we don't need to write orchestration level code ourselves.
Ecotone PHP Framework
The best way to get started with Ecotone is to actually build something realistic. Therefore we will build a small back-end for Shopping System during this tutorial. The techniques we will learn in the tutorial are fundamental to building any application using Ecotone.
Found something to improve in the docs? Create Pull Request in .
The tutorial is divided into several lessons:
, we will learn the fundamentals of Ecotone: Endpoints, Messages, Channels, and Command Query Responsibility Segregation (CQRS)
, we will learn Tactical Domain Driven Design (DDD): Aggregates, Repositories and also Event Handlers
, we will learn how to use Converters, therefore how to handle serialization and deserialization
You don’t have to complete all of the lessons at once to get the value out of this tutorial. You will start benefit from the tutorial even if it’s one or two lessons.
As mentioned earlier, Events are stored in form of a Event Stream. Event Stream is audit of Events, which happened in the past. However to protect our business invariants, we may want to work with current state of the Aggregate to know, if given action is possible or not (business invariants).
Business Invariants in short are our simple "if" statements inside the Command Handler in the Aggregate. Those protect our Aggregate from moving into incorrect state. With State-Stored Aggregates, we always have current state of the Aggregate, therefore we can check the invariants right away. With Event-Sourcing Aggregates, we store them in form of an Events, therefore we need to rebuild our Aggregate, in order to protect the invariants.
Suppose we have Ticket Event Sourcing Aggregate.
#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
(...)
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
return [new PersonWasAssigned($this->ticketId, $command->personId)];
}
}For this Ticket we do allow for assigning an Person to handle the Ticket. Let's suppose however, that Business asked us to allow only one Person to be assigned to the Ticket at time. With current code we could assign unlimited people to the Ticket, therefore we need to protect this invariant.
To check if whatever Ticket was already assigned to a Person, our Aggregate need to have state applied which will tell him whatever the Ticket was already assigned. To do this we use EventSourcingHandler attribute passing as first argument given Event class. This method will be called on reconstruction of this Aggregate. So when this Aggregate will be loaded, if given Event was recorded in the Event Stream, method will be called:
Then this state, can be used in the Command Handler to decide whatever we can trigger an action or not:
As you can see, it make sense to only assign to the state attributes that protect our invariants. This way the Aggregate stays readable and clean of unused information.
Read previous section to find out more about Interceptors.
We may aswell intercept Asynchronous Endpoints pretty easily. We do it by using pointing to AsynchronousRunningEndpoint class.
class TransactionInterceptor
{
#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(MethodInvocation $methodInvocation)
{
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
}catch (\Throwable $exception) {
$this->connection->rollBack();
throw $exception;
}
return $result;
}
}As part of around intercepting, if we need Message Payload to make the decision we can simply inject that into our interceptor:
We can also inject Message Headers into our interceptor. We could for example inject Message Consumer name in order to decide whatever to start the transaction or not:
Ecotone comes with full automation for setting up Event Sourcing for us. This we can we really easily roll out new features with Event Sourcing with just minimal or none setup at all.
Before we will start, let's first install Event Sourcing module, which will provide us with all required components:
composer require ecotone/pdo-event-sourcingWe need to configure DBAL Support in order to make use of it.
Ecotone PDO Event Sourcing does provide support for three databases:
PostgreSQL
MySQL
MariaDB
Ecotone provides inbuilt functionality to serialize your Events, which can be customized in case of need. This makes Ecotone take care of Event Serialization/Deserialization, and allows us to focus on the business side of the code.
We can take over this process and set up our own , however can fully do it for us, so we can simply focus on the business side of the code. To make it happen all we need to do, is to install JMS Package and we are ready to go:
Ecotone comes with inbuilt repositories, so we don't need to configure Repositories ourselves. It often happen that those are similar between projects, therefore it may be that there is no need to roll out your own.
Ecotone provides inbuilt repositories to get you started quicker. This way you can enable given repository and start implementing higher level code without worrying about infrastructure part.
This provides integration with . To enable it read more in .
This provides integration with . Eloquent support is available out of the box after installing .
This provides integration using relational databases. It will serialize your aggregate to json and deserialize on load using . To enable it read in .
Ecotone provides inbuilt Event Sourcing Repository, which will set up Event Store and Event Streams. To enable it read .
Concurrency exceptions when multiple processes or threads access and modify shared resources simultaneously. These exceptions happen because two or more operations conflict try to change same piece of data. Ecotone provides built-in support for concurrency handling.
In order to solve concurrent access, Ecotone implements Optimistic Locking.
Each Event Sourcing Aggregate or Event Sourcing Saga has a version property that represents the current version of the resource. When modifications are made, the version is incremented. If two concurrent processes attempt to modify the same resource with different versions, a concurrency exception is raised. This is default behaviour, if we are using inbuilt Event Sourcing support.
In case of Custom Repositories, we may use Ecotone support for optimistic locking to raise the exception in the Repository.
Version will be passed to the repository, based on #[AggregateVersion] property inside the Aggregate/Saga.
We don't need to deal with increasing those on each action. Ecotone will increase it in our Saga/Aggregate automatically. We may also use inbuilt trait to avoid adding property manually.
To handle concurrency exceptions and ensure the system can self-heal, Ecotone offers retry mechanisms.
In synchronous scenarios, like Command Handler being called via HTTP, can be used to recover. If a concurrency exception occurs, the Command Message will be retried immediately, minimizing any impact on the end user. This immediate retry ensures that the Message Handler can self-heal and continue processing without affecting the user experience. &#xNAN;In asynchronous scenarios, you can use still use instant retries, yet you may also provide . This means that when concurrency exception will occur, the Message will be retried after a certain delay. This as a result free the system resources from continues retries and allows for recovering after given period of delay.
Defines how to handle failures when processing messages. This is final failure strategy as it's used in case, when there is no other way to handle the failure. For example, when there is no retry policy, or when the retry policy has reached its maximum number of attempts. Also, when the destination of Error Channel is not defined, or sending to Error Channel fails.
Ecotone provides three final strategies:
RELEASE - Message is released back to the Channel for another attempt. This way order will be preserved, yet it can result in processing being blocked if the message keeps failing.
RESEND - Message is resend back to the Channel for another attempt, as a result Message Consumer will be unblock and will be able to continue on next Messages. This way next messages can be consumed without system being stuck.
IGNORE - Message is discarded, processing continues. Can be used for non critical message, to simply ignore failed messages.
STOP - Consumer stops, message is preserved. This strategy can be applied when our system depends heavily on the order of the Messages to work correctly. In that case we can stop the Consumer, resulting in Message still awaiting to be consumed.
This can be configured on Message Channel level:
Default for Message Channels is resend strategy.
This can also be configured at the Message Consumer level
Default for Message Consumers is stop strategy.
We may define Time to Live for Messages. This way if Message will not be handled within specific amount of time, it will be automatically discarded. This is useful in scenarios like sending notifications, where given notification like One Time Password may actually have meaning only for 5 minutes.
You may delay handling given asynchronous message by adding #[Delayed] attribute.
#[TimeToLive(new TimeSpan(seconds: 50))]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
{
// handle welcome notification
}To dynamically calculate expected TTL, we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate the delay and pass there our Command:
We may send an Message and tell Ecotone to delay it using deliveryDelay Message Header:
We may extend Gateways functionality with asynchronocity. This way we can pass any Message via given Message Channel first.
Asynchronous Gateways are available as part of Ecotone Enterprise.
To make Gateway Asynchronous we will use Asynchronous Attribute, just like with Asynchronous Message Handlers. We may can extend any types of Gateways: Command/Event/Query Buses, or custom .
To build for example a CommandBus which will send Messages over async channel, we will simply , and add our method.
then we all Commands that will be triggered via AsynchronousCommandBus will go over async channel.
It's enough to extend given CommandBus with custom interface to register new abstraction in Gateway in Dependency Container.
Having asynchronous CommandBus is especially useful, if given Message Handler is not meant be executed asynchronous by default.
then when using standard CommandBus above Command Handler will be executed synchronous, when using AsynchronousCommandBus it will be done asynchronously.
It's easy to build an Outbox pattern using this Asynchronous Gateways. Just make use of to push Messages over Database Channel.
and then register dbal channel
Then whenever we will send Events within Command Handler (which is wrapped in transaction by default while using Dbal Module). Messages will be commited as part of same transaction.
To keep the system reliable and resilient it's important to handle errors with grace. Ecotone provides solutions to handle failures within the system that helps in:
Self-healing the application (, , )
Ensuring Data Consistency (, , )
Using Event Sourcing in PHP
Before diving into this section be sure to understand how Aggregates works in Ecotone based on .
Ecotone provides higher level abstraction to work with Event Sourcing, which is based on Event Sourced Aggregates. Event Sourced Aggregate just like normal Aggregates protect our business rules, the difference is in how they are stored.
Changes in the Application will happen. After some time we may want to refactor namespaces, change the name of Aggregate or an Event. However those kind of changes may break our system, if we already have production data which references to any of those. Therefore to make our Application to immune to future changes we need a way to decouple the code from the data in the storage, and this is what Ecotone provides.
Our Event Stream name by default is based on the Aggregate Class name. Therefore to make it immune to changes we may provide custom Stream Name. To do it we will apply Stream attribute to the aggregate:
Then tell the projection to make use of it:
Distribution Bus is just like CommandBus or EventBus. It creates smooth and elegant way for explicit communication between Applications (Services) without introducing any hassle and requirements for doing configurations, bindings and mappings. It make it easy for Developers to build integrations and maintain them in the long-term.
Read more in given Module section.
class NotificationService
{
#[EventHandler]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
echo $event->getProductName() . "\n";
}
}$eventBus->publish(new OrderWasPlaced(1, "Milk"));Asynchronous Message Buses - This grants ability to build customized Command/Event Buses where Message will first go over given Asynchronous Channel. This can be used to build for example Outbox Command Bus.
Distributed Bus with Service Map - Provides way to communicate between Services (Applications) with ease and in explicit and decoupled way. Make it possible to use all available Message Channels providers (RabbitMQ, Amazon SQS, Redis, Dbal, Kafka, Symfony Message Channels, Laravel Queues).
Command Bus Instant Retries - Provides ability to roll out new Command Bus with custom retry configuration. Allows to help in self-healing from scenarios like during HTTP request - external service went down, or our database connection was interrupted.
Command Bus Error Channel - Provides ability to configure Error Channel for Command Bus. This way we can handle with grace synchronous scenarios like failure on receiving webhook, by pushing the Message to Error Channel.
Instant Aggregate Fetch - Provides ability to fetch Aggregates directly without the need to access Repositories. This way we can keep the code focused on the business logic instead of orchestration level code.
Orchestrators - Perfect for building predefined and dynamic workflows where the workflow definition is separate from the individual steps.
Details about more features coming soon...
Lesson 4, we will learn about Metadata and Method Invocation - How we can execute Message Handlers in a way not available in any other PHP Framework
Lesson 5, we will learn about Interceptors, Ecotone's powerful Middlewares
Lesson 6, we we will learn about Asynchronous Endpoints, so how to process our Messages asynchronously.
To find out more, read section related to specific implementation of Distributed Bus:
Distributed Bus with Service Map - Works with (RabbitMQ, Amazon SQS, Redis, Dbal, Kafka, Symfony Message Channels, Laravel Queues)
RabbitMQ Distributed Bus - Works with RabbitMQ only
Simple demo using Ecotone Lite.
Advanced demo using Ecotone Lite.
Symfony and Laravel application integration.
Starting with Microservices in PHP [Article]
Loosely coupled Microservices in PHP [Article]
Unexpected error with integration github-files: Integration is not installed on this space
#[Aggregate]
class Ticket
{
#[Identifier]
private string $ticketId;
#[CommandHandler]
public function assignWorker(AssignWorkerCommand $command)
{
// do something with assignation
}
}$this->commandBus->send(
new AssignWorkerCommand(
$ticketId, $workerId,
)
);#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private bool $isAssigned;
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
if ($this-isAssigned) {
throw new \InvalidArgumentException("Ticket already assigned");
}
return [new PersonWasAssigned($this->ticketId, $command->personId)];
}
#[EventSourcingHandler]
public function applyPersonWasAssigned(PersonWasAssigned $event) : void
{
$this->isAssigned = true;
}
}#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(
MethodInvocation $methodInvocation,
#[Payload] string $command
)#[Around(pointcut: AsynchronousRunningEndpoint::class)]
public function transactional(
MethodInvocation $methodInvocation,
#[Header('polledChannelName')] string $consumerName
)composer require ecotone/jms-converterpublic function save(
array $identifiers, object $aggregate, array $metadata,
// Version to verify before storing
?int $versionBeforeHandling
): void#[TimeToLive(expression: 'headers["expirationTime"]']
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
{
// handle welcome notification
}#[\Attribute]
class Cache
{
public string $cacheKey;
public int $timeToLive;
public function __construct(string $cacheKey, int $timeToLive)
{
$this->cacheKey = $cacheKey;
$this->timeToLive = $timeToLive;
}
}class ProductsService
{
#[QueryHandler]
#[Cache("hotestProducts", 120)]
public function getHotestProducts(GetOrderDetailsQuery $query) : array
{
return ["orderId" => $query->getOrderId()]
}
} class NotificationFilter
{
#[After]
public function filter($result, Cache $cache) : ?array
{
$this->cachingSystem($cache->cacheKey, $result, $cache->timeToLive);
}
}By default events in the stream will hold Aggregate Class name as AggregateType.
You may customize this by applying AggregateType attribute to your Aggregate.
To avoid storing class names of Events in the Event Store we may mark them with name:
This way Ecotone will do the mapping before storing an Event and when retrieving the Event in order to deserialize it to correct class.
It's worth to remember that if we want test storing Events using provided Event Named, we need to add them under recognized classes, so Ecotone knows that should scan those classes for Attributes:
The minimum needed for enabling Distributed Bus with Service Map and start consuming is to tell Ecotone, that we do use Service Map within the Service
and then we would define Message Channel, which we would use for for incoming messages:
Register Distributed Bus with given Service Map:
and define implementation of the distributed Message Channel:
For concrete use case, read Main Section or Custom Features section.
if ($this-isAssigned) {
throw new \InvalidArgumentException("Ticket already assigned");
}#[Saga]
class OrderProcess
{
use WithAggregateVersioning;
(...)AmqpBackedMessageChannelBuilder::create(channelName: 'async')
->withFinalFailureStrategy(FinalFailureStrategy::STOP)#[Delayed(expression: 'reference("delayService").calculate(payload)']
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationWhen(UserWasRegistered $event): void
{
}$commandBus->sendWithRouting(
"sendOneTimePassword",
"userId",
metadata: ["timeToLive" => new TimeSpan(minutes: 5)]
);#[Asynchronous("async")]
interface AsynchronousCommandBus extends CommandBus
{
}#[CommandHandler]
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}#[Asynchronous("outbox")]
interface OutboxEventBus extends EventBus {}#[ServiceContext]
public function orderChannel()
{
return DbalBackedMessageChannelBuilder::create("orders");
}#[CommandHandler]
public function placeOrder(PlaceOrderCommand $command, OutboxEventBus $eventBus) : void
{
// do something with $command
$eventBus->publish(new OrderWasPlaced());
}#[Stream("basket_stream")]
class Basket#[Projection(self::PROJECTION_NAME, "basket_stream")]
class BasketList#[AggregateType("basket")]
class Basket#[NamedEvent("basket.was_created")]
class BasketWasCreated
{
public const EVENT_NAME = "basket.was_created";
private string $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[Basket::class, BaskeWasCreated::class],
);#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return DistributedServiceMap::initialize();
}#[ServiceContext]
public function channels()
{
return SqsBackedMessageChannelBuilder::create("distributed_ticket_service")
}#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return DistributedServiceMap::initialize()
->withServiceMapping(
serviceName: "ticketService",
channelName: "distributed_ticket_service"
)
}#[ServiceContext]
public function channels()
{
return SqsBackedMessageChannelBuilder::create("distributed_ticket_service")
}Use git to download starting point in order to start tutorial
git clone [email protected]:ecotoneframework/symfony-tutorial.git
# Go to symfony-tutorial cataloggit clone [email protected]:ecotoneframework/laravel-tutorial.git
# Go to laravel-tutorial catalog
# Normally you will use "php artisan" for running console commands
# To reduce number of difference during the tutorial
# "artisan" is changed to "bin/console"git clone [email protected]:ecotoneframework/lite-tutorial.git
# Go to lite-tutorial catalog2. Run command line application to verify if everything is ready.
There are two options in which we run the tutorial:
Local Environment
Docker (preferred)
/** Ecotone Quickstart ships with docker-compose with preinstalled PHP 8.0 */
1. Run "docker-compose up -d"
2. Enter container "docker exec -it ecotone-quickstart /bin/bash"
3. Run starting command "composer install"
4. Run starting command "bin/console ecotone:quickstart"
5. You should see:
"Running example...
Hello World
/** You need to have atleast PHP 8.0 and Composer installed */
1. Run "composer install"
2. Run starting command "bin/console ecotone:quickstart"
3. You should see:
"Running example...
Hello World
Good job, scenario ran with success!"If you can see "Hello World", then we are ready to go. Time for Lesson 1!
To find out more about different use-cases, read related section about Handling Failures in Workflows.
Resilient Messaging with Laravel [Article]
[Article]
Normal Aggregates are stored based on their current state:
Yet if we change the state, then our previous history is lost:
Having only the current state may be fine in a lot of cases and in those situation it's perfectly fine to make use of State-Stored Aggregates. This is most easy way of dealing with changes, we change and we forget the history, as we are interested only in current state.
When we actually need to know what was the history of changes, then State-Stored Aggregates are not right path for this. If we will try to adjust them so they are aware of history we will most likely complicate our business code. This is not necessary as there is better solution - Event Sourced Aggregates.
When we are interested in history of changes, then Event Sourced Aggregate will help us. Event Sourced Aggregates are stored in forms of Events. This way we preserve all the history of given Aggregate:
When we change the state the previous Event is preserved, yet we add another one to the audit trail (Event Stream).
This way all changes are preserved and we are able to know what was the historic changes of the Product.
The audit trail of all the Events that happened for given Aggregate is called Event Stream. Event Stream contains of all historic Events for all instance of specific Aggregate type, for example all Events for Product Aggregate
Let's now dive a bit more into Event Streams, and what they actually are.
To use Ecotone's Aggregate functionality, we need a registered repository. Ecotone comes with built-in support for popular persistence options like Doctrine ORM, Eloquent, and document stores, so there's a good chance we can use what we already have without extra work. If our storage solution isn't supported yet, or if we have specific requirements, we can easily register our own custom repository by following the steps in this section. This flexibility means we're not locked into any particular database or ORM—we can use whatever fits our project best.
State-Stored Aggregate are normal Aggregates, which are stored using Standard Repositories.
Therefore to configure Repository for your Aggregate, create a class that extends StandardRepository interface:
canHandle method informs, which Aggregate Classes can be handled with this Repository. Return true, if saving specific aggregate is possible, false otherwise.
findBy method returns if found, existing Aggregate instance, otherwise null.
save method is responsible for storing given Aggregate instance.
$identifiers are array of #[Identifier] defined within aggregate.
$aggregate is instance of aggregate
$metadata is array of extra information, that can be passed with Command
When your implementation is ready simply mark it with #[Repository] attribute:
This is example implementation of Standard Repository using Doctrine ORM.
Repository:
By default Ecotone when we have only one Standard and Event Sourcing Repository registered, Ecotone will use them for our Aggregate by default. This comes from simplification, as if there is only one Repository of given type, then there is nothing else to be choose from. However, if we register multiple Repositories, then we need to take over the process and tell which Repository will be used for which Aggregate.
In case of we do it using canHandle method.
In case of inbuilt Repositories, we should follow configuration section for given type
Custom repository for Event Sourced Aggregates is described in more details under .
Be sure to read CQRS Introduction before diving in this chapter.
Business Interface aims to reduce boierplate code and make your domain actions explicit. In Application we describe an Interface, which executes Business methods. Ecotone will deliver implementation for this interface, which will bind the interface with specific actions. This way we can get rid of delegation level code and focus on the things we want to achieve. For example, if we don't want to trigger action via Command/Query Bus, we can do it directly using our business interface and skip all the Middlewares that would normally trigger during Bus execution. There are different types of Business Interfaces and in this chapter we will discuss the basics of build our own Business Interface, in next sections we will dive into specific types of business interfaces: Repositories and Database Layers.
Let's take as an example creating new Ticket
We may define interface, that will call this Command Handler whenever will be executed.
This way we don't need to use Command Bus and we can bypass all Bus related interceptors.
The attribute #[BusinessMethod] tells Ecotone that given Interface is meant to be used as entrypoint to Messaging and which Message Handler it should send the Message to. Ecotone will provide implementation of this interface directly in our Dependency Container.
From lower level API Business Method is actually a .
We may also execute given Aggregate directly using Business Interface.
Then we define interface:
We may of course pass Command class if we need to pass more data to our Aggregate's Command Handler.
Defining Query Interface works exactly the same as Command Interface and we may also use it with Aggregates.
Then we may call this Query Handler using Interface
If we have registered then we let Ecotone do conversion to Message Handler specific format:
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array to TicketDTO.
Such conversion are useful in order to work with objects and to avoid writing transformation code in our business code. We can build generic queries, and transform them to different classes using different business methods.
If we have registered then we let Ecotone do conversion to Message Handler specific format:
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array to CreateTicket command class.
This type of conversion is especially useful, when we receive data from external source and we simply want to push it to given Message Handler. We avoid doing transformations ourselves, as we simply push what we receive as array.
DDD PHP
There may be a scenario where the creation of an Aggregate is conditioned by the current state of another Aggregate.
Ecotone provides a possibility for that and lets you focus more on domain modeling rather than technical nuances you may face trying to implement an actual use case.
This case is supported by both Event Sourcing and State-based Aggregates.
It is possible to send a command to an Aggregate and expect a State-based Aggregate to be returned.
It is also possible to send a command to an Aggregate and expect the Event Sourcing Aggregate to be returned.
Both of the Aggregates (called and result) can still record their Events using an Internal Recorder. Recorded Events will be published after the operation is persisted in the database.
In the case of an Event Sourcing Aggregate recording an event indicates a state change of that Aggregate.
Also, when calling a State-based Aggregate its state may be changed before returning the newly created Aggregate. E.g. you want to save a reference to the newly created Aggregate.
Ecotone will try to persist both called and returned Aggregates.
When splitting your aggregates into the smallest, independent parts of the domain you have to be aware of transaction boundaries which Aggregate has to protect. In the case where the creation of an Aggregate is the transaction boundary of another Aggregate, it may require a state change of the one that protects that boundary.
This is a very specific scenario where two aggregates will persist at the same time within the same transaction which is covered by Ecotone.
In rich business domains we will want to work with higher level objects than associate arrays. Suppose we have PersonNameDTO and defined Ecotone's Converter for it.
Ecotone will read the Docblock and based on that will deserialize Result Set from database to list of PersonNameDTO.
Using combination of First Row Fetch Mode, we can get first row and then use it for conversion to PersonNameDTO.
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one. If we want to convert each result to given Class, we may define docblock describing the result:
Each returned row will be automatically convertered to PersonNameDTO.
We may return the result in specific format directly. This is useful when Business Method is used on the edges of our application and we want to return the result directly.
In this example, result will be returned in application/json.\
PHP Event Sourcing Snapshoting
In general having streams in need for snapshots may indicate that our model needs revisiting. We may cut the stream on some specific event and begin new one, like at the end of month from all the transactions we generate invoice and we start new stream for next month. However if cutting the stream off is not an option for any reason, we can use snapshots to avoid loading all events history for given Aggregate. Every given set of events snapshot will be taken, stored and retrieved on next calls, to fetch only events that happened after that.
EventSourcingConfiguration provides the following interface to set up snapshots.
class EventSourcingConfiguration
{
public const DEFAULT_SNAPSHOT_TRIGGER_THRESHOLD = 100;
public function withSnapshotsFor(
string $aggregateClassToSnapshot, // 1.
int $thresholdTrigger = self::DEFAULT_SNAPSHOT_TRIGGER_THRESHOLD, // 2.
string $documentStore = DocumentStore::class // 3.
): static
}$aggregateClassToSnapshot - class name of an aggregate you want Ecotone to save snapshots of
$thresholdTrigger - amount of events for interval of taking a snapshot
$documentStore - reference to document store which will be used for saving/retrieving snapshots
To set up snapshots we will define configuration.
Threshold states at which interval snapshots should be done. Therefore with below configuration:
snapshots will be done every 500 events. Then when snapshot will be loaded, it will start loading the events from event number 501 for given Aggregate instance.
Ecotone provides ability to extend any Messaging Gateway using Interceptors. We can hook into the flow and add additional behavior.
For better understanding, please read Interceptors section before going through this chapter.
Suppose we want to add custom logging, whenever any Command is executed. We know that CommandBus is a interface for sending Commands, therefore we need to hook into that Gateway.
Intercepting Gateways, does not differ from intercepting Message Handlers.
We may also want to have different types of Message Buses for given Message Type. For example we could have EventBus with audit which we would use in specific cases. Therefore we want to keep the original EventBus untouched, as for other scenarios we would simply keep using it.
To do this, we will introduce our new EventBus:
That's basically enough to register our new interface. This new Gateway will be automatically registered in our DI container, so we will be able to inject it and use.
It's enough to extend given Gateway with custom interface to register new abstraction in Gateway in Dependency Container. In above example AuditableEventBus will be automatically available in Dependency Container to use, as Ecotone will deliver implementation.
Now as this is separate interface, we can point interceptor specifically on this
We could of course intercept by attributes, if we would like to make audit functionality reusable
and then we pointcut based on the attribute
Gateways can also be extended with asynchronous functionality on which you can read more in .
In order to optimize projections or to avoid using external storage, we may use of Projection's State.
State is data that is kept between executions and can be passed to Projection's Event Handler.
In order to pass the state to Projection's Event Handlers we need to mark method parameter with #[ProjectionState].
Ecotone will resolve this parameter and pass the state. The returned state from the Event Handler will becomes new state for next execution. We may pass the state between all Event Handlers in given Projection.
The state can be simple array or a class. Whatever you pick, Ecotone will automatically serialize and deserialize it for you.
You may want to fetch the state from outside to return it to the end user.
In that case Ecotone brings ProjectionStateGateway.
The first parameter of the attribute is the projection name, so Ecotone can know, which state it should look for. This will automatically convert the state to your defined return type.
Gateways are automatically registered in your Dependency Container, so you can fetch it like any other service.
Projections are about deriving current state from the stream of events. Projections can be added in any moment of the application lifetime and be built up from existing history, till the current time. This is powerful concept as it allow for building views quickly and discarding them without pain, when they are not longer needed.
#[Projection("inProgressTicketList", Ticket::class] // 1
class InProgressTicketList
{
public function __construct(private Connection $connection) {}
#[EventHandler] // 2
public function addTicket(TicketWasRegistered $event, array $metadata) : void
{
$result = $this->connection->executeStatement(<<<SQL
INSERT INTO in_progress_tickets VALUES (?,?)
SQL, [$event->getTicketId(), $event->getTicketType()]);
}
#[QueryHandler("getInProgressTickets")] // 3
public function getTickets() : array
{
return $this->connection->executeQuery(<<<SQL
SELECT * FROM in_progress_tickets
SQL)->fetchAllAssociative();
}
}This tells Ecotone that specific class is Projection. The first parameter is the name of the projection and the second is name of the stream (the default is the name of the Aggregate) that this projection subscribes to.
Events that this projection subscribes to
Optional Query Handlers for this projection. They can be placed in different classes depending on preference.
is a great way to set up your projections. You can freely create DTO objects, or play with simple arrays and Ecotone will serialize/deserialize and store them for you.
Whenever message fails during asynchronous processing it will kept repeated till the moment it will succeed. However retry strategy with dead letter queue may be set up in order to retry message given amount of times and then move it to the storage for later review and manual retry.
This is where Ecotone Pulse kicks in, as instead of reviewing and replaying the message directly from the application's console, you may do it directly from the UI application. Besides you may connect multiple Ecotone's application to the Pulse Dashboard to have full overview of your whole system.
Ecotone Pulse provide way to control error messages for all your services from one place.
Enable in your service and with .
After this you may run docker image with Ecotone Pulse passing the configuration to your services and RabbitMQ connection.
Then run docker image with Ecotone Pulse passing environment variables:
Provide array of services with service name and .
DSN to your RabbitMQ instance, which services are connected with .
In the dashboard you may check all the connected services. For quick overview, you will find amount of errors within given service there.
To review error messages go to specific service. From there you can review the error message, stacktrace and replay it or delete.
You may check demo application, where Symfony and Laravel services are connected to Ecotone pulse in .
In its lifetime events may change. In order to track those changes Ecotone provides possibility of versioning events.
use Ecotone\Modelling\Attribute\Revision;
#[Revision(2)]
class MyEvent
{
public string $id;
}Value given with Revision attribute will be stored by Ecotone in events metadata. Attribute is used only when event is saved in event store. In order to read it, you can access events metadata, e.g. in event handler.
This feature is available as part of Ecotone Enterprise.
Depending on the version we may actually want to restore our Aggregate a bit differently. This is especially useful when we've changed the way Events are structured and introduced new version of the Event. For this we can use revision header to access the version on which given Event was stored.
We may inject any type of Header that was stored together with the Event. This means inbuilt not only headers like timestamp, id, correlationId are avaiable out of the box, but also custom headers provided by the application (e.g. userId).
Asynchronous PHP
You may find demo implementation here.
[Article]
[Article]
[Article]
[Documentation]
Just as with Standard Aggregate, ES Aggregates are called by Command Handlers, however what they return are Events and they do not change their internal state.
When this Aggregate will be called via Command Bus with CreateProduct Command, it will then return new ProductWasCreated Event.
All Events may contain additional Metadata. This is especially useful for storing information that are not required for Command to be handled, yet are important for auditing and projecting purposes.
In Ecotone any communication happens via Messages, and Messages contains of Payload and Headers (Metadata).
So far we've discussed only the Payload part, for example ProductWasCreated Event Class is actually an Payload.
One of the drawback of Event Sourcing is eventual consistency. Whenever event happens we want to do two things, update our projection and inform the end user about change. However user need to be informed about the change after projection is refreshed, as otherwise he will get stale view. In order to solve this drawback Ecotone brings possibility for emitting events directly from projection. So instead of subscribing to Domain Events (Aggregate State Changed), end user may subscribe to change in the projection.
In order to emit the events, we are using EventStreamEmitter.
Whenever we emit given events, they are stored in Projection's stream.
In case of Ecotone we don't delay whole Message, but specific Message Handler. This helps in scenarios when we have multiple Event Handler and we would like to configure the delay differently. For example may we have a case, where as a result of Order being placed, we would want to delay notification, yet to call Payment Service right away.
interface StandardRepository
{
1 public function canHandle(string $aggregateClassName): bool;
2 public function findBy(string $aggregateClassName, array $identifiers) : ?object;
3 public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void;
}class PersonNameDTOConverter
{
#[Converter]
public function from(PersonNameDTO $personNameDTO): array
{
return [
"person_id" => $personNameDTO->getPersonId(),
"name" => $personNameDTO->getName()
];
}
#[Converter]
public function to(array $personNameDTO): PersonNameDTO
{
return new PersonNameDTO($personNameDTO['person_id'], $personNameDTO['name']);
}
}class LoggerInterceptor
{
#[Before(pointcut: CommandBus::class)]
public function log(object $command, array $metadata) : void
{
// log Command message
}
}use Ecotone\Messaging\MessageHeaders;
class MyEventHandler
{
#[EventHandler]
public function handle(MyEvent $event, array $metadata) : void
{
if ($metadata[MessageHeaders::REVISION] !== 2) {
return; // this is not the revision I'm looking for
}
// the force is strong with this one
}
}




$expectedVersion if version locking by #[Version] is enabled it will carry currently expected
->withSnapshotsFor(Ticket::class, 500)


After importing trait, Events will be automatically retrieved and published after handling Command in your Aggregate.
Using recordThat will delay sending an event till the moment your Aggregate is saved in the Repository. This way you ensure that no Event Handlers will be called before the state is actually stored.
Sometimes you may have situation, where Event from one Aggregate will actually change another Aggregate. In those situations you may actually subscribe to the Event directly from Aggregate, to avoid creating higher level boilerplate code.
In those situations however you need to ensure event contains of reference id, so Ecotone knows which Aggregate to load from the database.
For more sophisticated scenarios, where there is no direct identifier in corresponding event, you may use of identifier mapping. You can read about it more in Saga related section.
You may subscribe to Events by names, instead of the class itself. This is useful in cases where we want to decoupled the modules more, or we are not interested with the Event Payload at all.
For Events published from your Aggregate, it's enough to provide NamedEvent attribute with the name of your event.
And then you can subscribe to the Event using name
Command Handlers may return single events, multiple events or no events at all, if nothing is meant to be changed.
Aggregates under the hood make use of Partition persistence strategy (Refer to Working with Event Streams). This means that we need to know:
Aggregate Version
Aggregate Id
Aggregate Type
To find out about current version of Aggregate Ecotone will look for property marked with Version Attribute.
We don't to add this property directly, we can use trait instead:
Anyways, this is all we need to do, as Ecotone will take care of reading and writing to this property. This way we can focus on the business logic of the Aggregate, and Framework will take care of tracking the version.
We need to tell to Ecotone what is the Identifier of our Event Sourcing Aggregate. This is done by having property marked with Identifier in the Aggregate:
As Command Handlers are pure and do not change the state of our Event Sourcing Aggregate, this means we need a different way to mutate the state in order to assign the identifier. For changing the state we use EventSourcingHandler attribute, which tell Ecotone that if given Event happens, then trigger this method afterwards:
We will explore how applying Events works more in next section.
Aggregate Type will be the same as Aggregate Class. We can decouple the class from the Aggregate Type, more about this can be found in "Making Stream immune to changes" section.
So when this Command Handler happens:
What actually will happen under the hood is that this Event will be applied to the Event Stream:
As storing in Event Store is abstracted away, the code stays clean and contains only of the business part. We can customize the Stream Name, Aggregate Type and even Event Names when needed.

Events are stored in stream called project{projectionName}. In above case it will be project_wallet_balance.
After that you may subscribe to given events, just like to any other events.
All the events are stored in the streams, this means that in case of need we may create another projection that will subscribe to those events.
In some cases we may want to emit event to existing stream (for example to provide summary event), or to fresh new stream.
In order to do that we may use linkTo method on EventStreamEmitter.
LinkTo works from any place in the code, however emit as it stores in projection's stream works only inside projection.
When we rebuild the projection events could be republished and that would affect our end users, plus would link duplicated events to our stream. Luckily Ecotone will handle this scenario and will not republish or store any events that are emitted during reset phase.
This is the biggest difference between using EventBus versus EventStreamEmitter.
As EventBus would simple republish the events during rebuild phase.
When projection is deleted, Ecotone will automatically delete projection stream.
You may delay handling given asynchronous message by adding #[Delayed] attribute.
To dynamically calculate expected delay, we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate the delay and pass there our Command:
We may send an Message and tell Ecotone to delay it using deliveryDelay Message Header:
If Message Delay would be send for Event. Then all subscribing Event Handlers would be delayed. For customizing it on the single Handler level, use Message Handler delay.
We may also delay to given date time:
#[Repository]
class DoctrineRepository implements StandardRepository
{
// implemented methods
}final class EcotoneTicketRepository implements StandardRepository
{
public function __construct(private readonly EntityManagerInterface $entityManager)
{
}
public function canHandle(string $aggregateClassName): bool
{
return $aggregateClassName === Ticket::class;
}
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
return $this->entityManager->getRepository(Ticket::class)
// Array of identifiers for given Aggregate
->find($identifiers['ticketId']);
}
public function save(array $identifiers, object $aggregate, array $metadata, ?int $versionBeforeHandling): void
{
$this->entityManager->persist($aggregate);
}
}class TicketService
{
#[CommandHandler("ticket.create")]
public function createTicket(CreateTicketCommand $command) : void
{
// handle create ticket command
}
}interface TicketApi
{
#[BusinessMethod('ticket.create')]
public function create(CreateTicketCommand $command): void;
}#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private bool $isClosed;
#[CommandHandler("ticket.close")]
public function close(): void
{
$this->isClosed = true;
}
}interface TicketApi
{
#[BusinessMethod('ticket.close')]
public function create(#[Identifier] Uuid $ticketId): void;
}class TicketService
{
#[QueryHandler("ticket.get_by_id")]
public function getTicket(GetTicketById $query) : array
{
//return ticket
}
}interface TicketApi
{
#[BusinessMethod("ticket.get_by_id")]
public function getTicket(GetTicketById $query): array;
}class TicketService
{
#[QueryHandler("ticket.get_by_id")]
public function getTicket(GetTicketById $query) : array
{
//return ticket as array
}
}interface TicketApi
{
// return ticket as Class
#[BusinessMethod("ticket.get_by_id")]
public function getTicket(GetTicketById $query): TicketDTO;
}class TicketService
{
#[CommandHandler("ticket.create")]
public function getTicket(CreateTicket $command) : void
{
}
}interface TicketApi
{
#[BusinessMethod("ticket.create")]
public function getTicket(array $query): void;
}#[Aggregate]
final class Calendar
{
/** @var array<string> */
private array $meetings = [];
public function __construct(#[Identifier] public string $calendarId)
{
}
#[CommandHandler]
public function scheduleMeeting(ScheduleMeeting $command): Meeting
{
// checking business rules
$this->meetings[] = $command->meetingId;
return new Meeting($command->meetingId);
}
}
#[Aggregate]
final class Meeting
{
public function __construct(#[Identifier] public string $meetingId)
{
}
}#[Aggregate]
final class Calendar
{
/** @var array<string> */
private array $meetings = [];
public function __construct(#[Identifier] public string $calendarId)
{
}
#[CommandHandler]
public function scheduleMeeting(ScheduleMeeting $command): Meeting
{
// checking business rules
$this->meetings[] = $command->meetingId;
return Meeting::create($command->meetingId);
}
}
#[EventSourcingAggregate(true)]
final class Meeting
{
use WithEvents;
use WithAggregateVersioning;
#[Identifier]
public string $meetingId;
public static function create(string $meetingId): self
{
$meeting = new self();
$meeting->recordThat(new MeetingCreated($meetingId));
return $meeting;
}
}/**
* @return PersonNameDTO[]
*/
#[DbalQuery('SELECT person_id, name FROM persons LIMIT :limit OFFSET :offset')]
public function getNameListDTO(int $limit, int $offset): array;#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTO(int $personId): PersonNameDTO;/**
* @return iterable<PersonNameDTO>
*/
#[DbalQuery(
'SELECT person_id, name FROM persons ORDER BY person_id ASC',
fetchMode: FetchMode::ITERATE
)]
public function getPersonIdsIterator(): iterable;#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW,
replyContentType: 'application/json'
)]
public function getNameDTOInJson(int $personId): string;#[ServiceContext]
public function aggregateSnapshots()
{
return EventSourcingConfiguration::createWithDefaults()
->withSnapshotsFor(Ticket::class, 1000)
->withSnapshotsFor(Basket::class, 500, MyDocumentStore::class)
;
}interface AuditableEventBus extends EventBus {}#[CommandHandler]
public function placeOrder(PlaceOrder $command, AuditableEventBus $eventBus)
{
// place order
$eventBus->publish(new OrderWasPlaced());
}class Audit
{
#[Before(pointcut: AuditableEventBus::class)]
public function log(object $event, array $metadata) : void
{
// store audit
}
}#[Auditable]
interface AuditableEventBus extends EventBus {}class Audit
{
#[Before(pointcut: Auditable::class)]
public function log(object $event, array $metadata) : void
{
// store audit
}
}#[Asynchronous("async")]
interface OutboxCommandBus extends CommandBus
{
}#[Projection(self::NAME, Ticket::class)]
final class TicketCounterProjection
{
const NAME = "ticket_counter";
#[EventHandler]
public function when(TicketWasRegistered $event, #[ProjectionState] TicketCounterState $state, EventStreamEmitter $eventStreamEmitter): TicketCounterState
{
$state = $state->increase();
$eventStreamEmitter->emit([new TicketCounterWasChanged($state->count)]);
return $state;
}
}interface TicketCounterGateway
{
#[ProjectionStateGateway(TicketCounterProjection::NAME)]
public function getCounter(): TicketCounterState;
}#[Projection(self::NAME, Account::class)]
final class AvailableBalanceProjection
{
const NAME = "available_balance";
const QUERY = "getCurrentBalance";
public function __construct(private DocumentStore $documentStore) {}
#[EventHandler]
public function whenAccountSetup(AccountSetup $event): void
{
$this->documentStore->addDocument(
self::NAME,
$event->accountId,
["balance" => 0]
);
}
#[EventHandler]
public function whenPaymentMade(PaymentMade $event): void
{
$this->documentStore->updateDocument(
self::NAME,
$event->accountId,
["balance" => $this->getCurrentBalance($event->accountId) + $event->amount]
);
}
#[QueryHandler(self::QUERY)]
public function getCurrentBalance(UuidInterface $accountId): int
{
return $this->documentStore->getDocument(self::NAME, $accountId)["balance"];
}
}docker run -p 80:80 -e SERVICES='[{"name":"customer_service","databaseDsn":"mysql://user:pass@host/db_name"}]' -e AMQP_DSN='amqp://guest:guest@rabbitmq:5672//' -e APP_DEBUG=true ecotoneframework/ecotone-pulse:0.1.0SERVICES=[{"name":"customer_service","databaseDsn":"mysql://user:pass@host/db_name"}]AMQP_DSN='amqp://guest:guest@rabbitmq:5672//'#[EventSourcingAggregate]
class Product
{
#[Identifier]
private string $id;
private string $type;
(...)
#[EventSourcingHandler]
public function applyProductWasCreated(
ProductWasCreated $event,
// Accessing Metadata
#[Header("revision")] int $revision,
) : void
{
$this->id = $event->id;
if ($revision < 4) {
$this->type = 'standard';
}
$this->type = $event->type;
}
}#[Aggregate]
class Ticket
{
// Import trait with recordThat method
use WithEvents;
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public function changeTicket(ChangeTicket $command): void
{
$this->description = $command->description;
$this->assignedTo = $command->assignedTo;
// Record the event
$this->recordThat(new TicketWasChanged($this->ticketId));
}
}#[Aggregate]
class Promotion
{
#[Identifier]
private Uuid $userId;
private bool $isActive;
#[EventHandler]
public function stop(AccountWasClosed $event): void
{
$this->isActive = false;
}
}class readonly AccountWasClosed
{
public function __construct(public Uuid $userId) {}
}#[NamedEvent('order.placed')]
final readonly class OrderWasPlaced
{
public function __construct(
public string $orderId,
public string $productId
) {}
}#[EventHandler(listenTo: "order.placed")]
public function notify(#[Header("executoId")] $executorId): void
{
// notify user that the Order was placed
}#[EventSourcingAggregate]
class Product
{
use WithAggregateVersioning;
#[Identifier]
private string $id;
#[CommandHandler]
public static function create(CreateProduct $command) : array
{
return [new ProductWasCreated($command->id, $command->name, $command->price)];
}
}#[Version]
private int $version = 0;#[EventSourcingAggregate]
class Product
{
use WithAggregateVersioning;#[Identifier]
private string $id;#[EventSourcingHandler]
public function applyProductWasCreated(ProductWasCreated $event) : void
{
$this->id = $event->id;
}#[CommandHandler]
public static function create(CreateProduct $command) : array
{
return [new ProductWasCreated($command->id, $command->name, $command->price)];
}$eventStore->appendTo(
Product::class, // Stream name
[
Event::create(
$event,
metadata: [
'_aggregate_id' => 1,
'_aggregate_version' => 1,
'_aggregate_type' => Product::class,
]
)
]
);#[Projection( "wallet_balance", Wallet::class)]
final class WalletBalanceProjection
{
#[EventHandler]
public function whenMoneyWasAdded(MoneyWasAddedToWallet $event, EventStreamEmitter $eventStreamEmitter): void
{
$wallet = $this->getWalletFor($event->walletId);
$wallet = $wallet->add($event->amount);
$this->saveWallet($wallet);
$eventStreamEmitter->emit([new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)]);
}
#[EventHandler]
public function whenMoneyWasSubtract(MoneyWasSubtractedFromWallet $event, EventStreamEmitter $eventStreamEmitter): void
{
$wallet = $this->getWalletFor($event->walletId);
$wallet = $wallet->subtract($event->amount);
$this->saveWallet($wallet);
$eventStreamEmitter->emit([new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)]);
}
(...)
}final class NotificationService
{
#[EventHandler]
public function when(WalletBalanceWasChanged $event): void
{
// sending websocket event
}
}$eventStreamEmitter->linkTo("wallet", [new WalletBalanceWasChanged($event->walletId, $wallet->currentBalance)]);#[Delayed(new TimeSpan(seconds: 50))]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}#[Delayed(expression: 'payload.dueDate']
#[Asynchronous("orders")]
#[EventHandler(endpointId: "cancelOrder")]
public function cancelOrderIfExpired(OrderWasPlaced $event): void
{
// it will trigger at payload.dueDate, which is \DateTime object
}#[Delayed(expression: 'reference("delayService").calculate(payload)']
#[Asynchronous("orders")]
#[EventHandler(endpointId: "cancelOrder")]
public function cancelOrderIfExpired(OrderWasPlaced $event): void
{
}$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["deliveryDelay" => new TimeSpan(days: 1)]
);$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["deliveryDelay" => (new \DateTimeImmutable)->modify('+1 day')]
);Let's create Event Order was placed.
And Event Handler that will be listening to the OrderWasPlaced.
Let's Ecotone that we want to run this Event Handler Asynchronously using RabbitMQ
Ecotone Framework use the Metadata for Framework related details, which can be used for identifying messages, correlating, and targeting (which Aggregate it's related too). However we can also use the Metadata for additional information in our Application too.
Ecotone provides Metadata propagation, which take cares of passing Metadata between Command and Events without the need for us to do it manually. This way we can keep our business code clean, yet still be able to access the Metadata later.
Even so, the Metadata is not used in our Ticket Aggregate, when the Event will be stored in the Event Stream, it will be stored along side with our provided Metadata. Therefore we will be able to access it in any Event Handlers:
We can also manually add Metadata directly from Command Handler, by packing the our data into Event class.
and then access it from any subflows:
We may access metadata sent from Command Bus in Command Handler when needed:
This feature is available as part of Ecotone Enterprise.
We may also access Metadata inside our Event Sourcing Handler. This may be useful when we need to protect business invariants based on the data, that is stored as part of Metadata.

Let's create PlaceOrder Command that will place an order in our system.
And Command Handler that will handle this Command
Let's define GetOrder Query that will find our placed order.
And Query Handlerthat will handle this query
PHP Event Streams
The Projection is deriving from Event Stream.
There may be situations when we will want to derive the projection from multiple streams however.
Let's see what options do we have:
If we are interested in single stream, we can listen directly for specific aggregate
In here we are handling events from single Basket's Aggregate stream. It will contain all the events in relation to this aggregate.
There may be situations, when we will want to handle different streams together.
In case if using we will need to use categories to target.
If we would listen on Domain\Ticket stream using Stream Per Aggregate then we would not target any event, as the streams that are created are suffixed by the identifier Domain\Ticket-123.
In that case we can make use of categories in order to target Domain\Ticket-*.
If you want to avoid storing class names of your events in the Event Store you may mark them with name.
And tell the projection to make use of it
If projections are handling the events by names, then there is no need to deserialization of the event to the class and simple array can be used. In case of thousands of events during it will speed up the process.
DDD Aggregates PHP
This chapter will cover the basics on how to implement an Aggregate. We will be using Command Handlers in this section, so ensure reading External Command Handler section first, to understand how Command are sent and handled.
Working with Aggregate Command Handlers is the same as with External Command Handlers.
We mark given method with Command Handler attribute and Ecotone will register it as Command Handler.
In most common scenarios, Command Handlers are used as boilerplate code, which fetch the aggregate, execute it and then save it.
This is non-business code that is often duplicated wit each of the Command Handler we introduce. Ecotone wants to shift the developer focus on the business part of the system, this is why this is abstracted away in form of Aggregate.
By providing Identifier attribute on top of property in your Aggregate, we state that this is identifier of this Aggregate (Entity in Symfony/Doctrine world, Model in Laravel world).
This is then used by Ecotone to fetch your aggregate automatically.
However Aggregates need to be in order to be executed. When we will send an Command, Ecotone will use property with same name from the Command instance to fetch the Aggregate.
You may read more about Identifier Mapping and more advanced scenarios in .
When identifier is resolved, Ecotone use repository to fetch the aggregate and then call the method and then save it. So basically do all the boilerplate for you.
To implement repository reference to . You may use inbuilt repositories, so you don't need to implement your own. Ecotone provides , , integration with or .
An Aggregate is a regular object, which contains state and methods to alter that state. It can be described as Entity, which carry set of behaviours. When creating the Aggregate object, you are creating the Aggregate Root.
Aggregate tells Ecotone, that this class should be registered as Aggregate Root.
Identifier is the external reference point to Aggregate.
This field tells Ecotone to which Aggregate a given Command is targeted.
CommandHandler
PHP Event Sourcing Projections
By default Ecotone runs the projections synchronously with your aggregate changes.
This kind of running configuration can be used to avoid eventual consistency problems or for testing purposes.
However when you expect multiple accesses to your Aggregates at the same time, you may consider asynchronous projection to protect yourself from concurrency problems.
This projections are running within same transaction as your Event Store changes. This will ensure atomic consistency between your aggregate and projection.
You may run your projection in the background. It will query the database within constant time intervals, to look if new events have been registered. Each projection is running as separate process. To register Polling Projection make use of .
Then we can run:
You may pass your projections in event driven manner using .
The difference between Polling and Event Driven projection is the way they are triggered.
The Event Driven is only triggered when new event comes to the system. This avoid the pitfall of continues database access while using Polling Projection.
The second strength of Asynchronously Event Driven Projection is possibility of registering multiple projections under same channel (which is same consumer).
You may customize your Event Sourcing configuration with following configuration:
You may configure your projection custom configuration. Take under consideration that some configuration may have sense only in case of .
load_count //Default: null
Change load batch size in each run for single projection.
cache_size //Default: 1000
The cache size is how many stream names are cached in memory, the higher the number the less queries are executed and therefore the projection runs faster, but it consumes more memory.
sleep //Default: 100000
The sleep options tells the projection to sleep that many microseconds before querying the event store again when no events were found in the last trip. This reduces the number of cpu cycles without the projection doing any real work.
persist_block_size //Default: 1000
The persist block size tells the projector to persist its changes after a given number of operations. This increases the speed of the projection a lot. When you only persist every 1000 events compared to persist on every event, then 999 write operations are saved. The higher the number, the fewer write operations are made to your system, making the projections run faster. On the other side, in case of an error, you need to redo the last operations again. If you are publishing events to the outside world within a projection, you may think of a persist block size of 1 only.
lock_timeout_ms //Default: 1000
Indicates the time (in milliseconds) the projector is locked. During this time no other projector with the same name can be started. A running projector will update the lock timeout on every loop, except you configure an update lock threshold.
update_lock_threshold //Default: 0
If update lock threshold is set to a value greater than 0 the projection won't update lock timeout until number of milliseconds have passed. Let's say your projection has a sleep interval of 100 ms and a lock timeout of 1000 ms. By default the projector updates lock timeout after each run so basically every 100 ms the lock timeout is set to: now() + 1000 ms This causes a lot of extra work for your database and in case the database is replicated this can cause a lot of network traffic, too.
gap_detection //Default: new \Prooph\EventStore\Pdo\Projection\GapDetection()
Gap Detection makes projection to wait for upcoming events if any gap occurs in your event stream. To disable Gap Detection you can set value to null.
In default flow there is no need to fetch or store Aggregates, because this is done for us. We simply need to trigger an Command via CommandBus. However in some cases, you may want to retake orchestration flow and do it directly. For that cases Business Repository Interface or Instant Fetch Aggregate can help you.
Special type of is Repository. This Interface allows us to simply load and store our Aggregates directly. In situations when we call Command directly in our Aggregates we won't be in need to use it. However for some specific cases, where we need to load Aggregate and store it outside of Aggregate's Command Handler, this business interface becomes useful.
Ecotone will read type hint to understand which Aggregate you would like to fetch or save.
Implementation will be delivered by Ecotone. All you need to do is to define the interface and it will available in your Dependency Container
When using Pure Event Sourced Aggregate, instance of Aggregate does not hold recorded Events. Therefore passing aggregate instance would not contain any information about recorded events. For Pure Event Sourced Aggregates, we can use direct event passing to the repository:
In cases where we want to fetch Aggregate directly without introducing orchestration into our code base or depending on the Repositories, we can use Fetch Aggregate Attribute.
Instant Fetch Aggregate is available as part of Ecotone Enterprise.
To do instant fetch of Aggregate we will be using Fetch Attribute. Suppose we want PlaceOrder Command Handler, and we want to fetch User Aggregate:
Fetch using to evaluate the expression given inside the Attribute. For example having above "payload.userId" and following Command:
Ecotone will use userId from the Command to fetch User Aggregate instance. &#xNAN;"payload" is special variable within expression that points to our Command, therefore whatever is available within the Command is available for us to do the fetching. This provides quick way of accessing related Aggregates without the need to inject Repositories.
By default Ecotone will throw Exception if Aggregate is not found, we can change the behaviour simply by allowing nulls in our method declaration:
We can also use Message Headers to fetch our related Aggregate instance:
In some cases we may not have enough information to provide correct Identifier, for example that may require some mapping in order to get the Identifier. For this cases we can use "reference" function to access any Service from Depedency Container in order to do the mapping.
Describes how streams with events will be stored. Each Event Stream is separate Database Table, yet how those tables are created and what are constraints do they protect depends on the persistence strategy.
This is the basics Stream Strategy which involves no constraints. This means that we can append any Events to it without providing any additional metadata.
Now as this is free append involves no could re-run this code apply exactly the same Event. This can sounds silly, but it 's make it useful for particular cases. It make it super easy to append new Events. We basically could just add this action in our code and keep applying Events to the Event Stream, we don't need to know context of what happened before.
This is useful for scenarios where we just want to store information without putting any business logic around this. It could be used to continues stream of information like:
Temperature changes
Counting car passing by in traffic jam
Recording clicks and user views.
This the default persistence strategy. It does creates partitioning within Event Stream to ensure that we always maintain correct history within partition. This way we can be sure that each Event contains details on like Aggregate id it does relate to, on which version it was applied, to what Aggregate it references to.
The tricky part here is that we need to know Context in order to apply the Event, as besides the Aggregate Id, we need to provide Version. To know the version we need to be aware of last previous applied Event.
When this persistence strategy is used with Ecotone's Aggregate, Ecotone resolve metadata part on his own, therefore working with this Stream becomes easy. However when working directly with Event Store getting the context may involve extra work.
This Stream Strategy is great whenever business logic is involved that need to be protected. This solves for example the problem of concurrent access on the database level, as we for example can't store Event for same Aggregate Id and Version twice in the Event Stream. We would use it in most of business scenarios where knowing previous state in order to make the decision is needed, like:
Check if we can change Ticket based on status
Performing invocing from previous transactions
Decide if Order can be shipped
This is similar to Partition strategy, however each Partition is actually stored in separate Table, instead of Single One.
This can be used when amount of partitions is really low and volume of events within partition is huge.
You may provide your own Customer Persistence Strategy as long as it implements PersistenceStrategy.
To set given persistence strategy as default, we can use ServiceContext:
Once set, the persistence strategy will apply to all streams in your application. However, you may face a situation when you need to have a different strategy for one or more of your streams.
The above will make the Simple Stream Strategy as default however, for some_stream Event Store will use the Aggregate Stream Strategy.
Be aware that we won't be able to set Custom Strategy that way.
Ecotone allows to work with Database using DbalBusinessMethod. The goal is to create abstraction which significantly reduce the amount of boilerplate code required to implement data access layers. Thanks to Dbal based Business Methods we are able to avoid writing integration and transformation level code and focus on the Business part of the system. To make use of Dbal based Business Method, install Dbal Module first.
Let's consider scenario where we want to store new record in Persons table. To make it happen just like with Business Method we will create an Interface, yet this time we will mark it with DbalBusinessMethod.
The first parameter passed to DbalBusinessMethod is actual SQL, where we can provide set of named parameters. Ecotone will automatically bind parameters from method declaration to SQL ones by names.
Above example will use DbalConnectionFactory::class for database Connection, which is the default for . If you want to run Business Method on different connection, you can do it using connectionReferenceName parameter inside the Attribute.
We may bind parameter name explicitly by using DbalParameter attribute.
This can be used when we want to decouple interface parameter names from binded parameters or when name in database column is not explicit enough for being part of interface.
If we want to return amount of the records that have been changed, we can add int type hint to our Business Method:
We may want to fetch data from the database and for this we will be using DbalQueryBusinessMethod.
The above will return result as associative array with the columns provided in SELECT statement.
To format result differently we may use different fetch modes. The default fetch Mode is associative array.
This will extract the first column from each row, which allows us to return array of person Ids directly.
To get single variable out of Result Set we can use First Column of first row Mode.
This way we can provide simple interfaces for things Aggregate SQLs, like SUM or COUNT.
To fetch first Row of given Result Set, we can use First Row Mode.
This will return array containing person_id and name.
When using First Row Mode, we may end up having no returned row at all. In this situation Dbal will return false, however if Return Type will be nullable, then Ecotone will convert false to null.
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one.
Each parameter may have different type and Ecotone will try to recognize specific type and set it up accordingly. If we want, we can take over and define the type explicitly.
Ecotone comes with inbuilt Event Sourcing repository after Event Sourcing package is installed. However you want to roll out your own storage for Events, or maybe you already use some event-sourcing framework and would like to integrate with it. For this you can take over the control by introducing your own Event Sourcing Repository.
Using Custom Event Sourcing Repository will not allow you to make use of inbuilt projection system. Therefore consider configuring your own Event Sourcing Repository only if you want to build your own projecting system.
We do start by implementing EventSourcingRepository interface:
canHandle - Tells whatever given Aggregate is handled by this Repository
findBy - Method returns previously created events for given aggregate. Which Ecotone will use to reconstruct the Aggregate.
save - Stores events recorded by Event Sourced Aggregate
and then we need to mark class which implements this interface as Repository
Ecotone provides enough information to decide how to store provided events.
Identifiers will hold array of identifiers related to given aggregate (e.g. ["orderId" ⇒ 123]). Events will be list of Ecotone's Event classes, which contains of payload and metadata, where payload is your Event class instance and metadata is specific to this event. Metadata as parameter is generic metadata available at the moment of Aggregate execution. Version before handling on other hand is the version of the Aggregate before any action was triggered on it. This can be used to protect from concurrency issues.
The structure of Events is as follows:
It's worth to mention about Ecotone's Events and especially about metadata part of the Event. Each metadata for given Event contains of three core Event attributes:
"_aggregate_id" - This provides aggregate identifier of related Aggregate
"_aggregate_version" - This provides version of the related Event (e.g. 1/2/3/4)
"_aggregate_type" - This provides type of the Aggregate being stored, which can be customized
If our repository stores multiple Aggregates is useful to have the information about the type of Aggregate we are storing. However keeping the class name is not best idea, as simply refactor would break our Event Stream. Therefore Ecotone provides a way to mark our Aggregate type using Attribute
This now will be passed together with Events under _aggregate_type metadata.
In Ecotone we can name the events to avoid storing class names in the Event Stream, to do so we use NamedEvent.
then when events will be passed to save method, they will automatically provide this name under eventName property.
With custom repository we still can use inbuilt . To use it for customized repository we will use BaseEventSourcingConfiguration.
Ecotone then after fetching snapshot, will load events only from this given moment using `fromAggregateVersion`.
If you want to test out your flow and storing with your custom Event Sourced Repository, you should disable default in memory repository
Scheduling PHP
Ecotone comes with support for running period tasks or cron jobs.
endpointId - it's name which identifies process to run
poller - Configuration how to execute this method .
Above configuration tells Ecotone to execute this method every second.
After setting up Scheduled endpoint we can run the endpoint:
You can run Scheduled for given Handler.
Right now method return which is send to given routing.
requestChannelName - The channel name to which should be send.
When the Message will arrive on the Command Handler it will be automatically converted to ExchangeCommand. If you want to understand how the conversion works, you may read about it in .
We can also set up cron and fixed rate using expression language. This gives us ability to set it up differently based on the environment we are currently in.
Timing will be evaluated once, and then preserved as timing configuration. The evaluation will happen when ecotone:run will be executed.
You may find demo implementation .
Ecotone provides easy way to pass Message Headers (Metadata) with your Message and use it in your Message Handlers or Interceptors. In case of asynchronous scenarios, Message Headers will be automatically mapped and passed to through your Message Broker.
Pass your metadata (headers), as second parameter.
Then you may access them directly in Message Handlers:
If you have defined for given type, then you may type hint for the object and Ecotone will do the conversion:
And then we can use Classes instead of scalar types for our Headers:
Ecotone provides a lot of support for Conversion, so we can work with higher level business class not scalars. Find out more in .
Ecotone by default propagate all Message Headers automatically. This as a result preserve context, which can be used on the later stage. For example we may provide executorId header and skip it in our Command Handler, however use it in resulting Event Handler.
This will execute Command Handler:
and then even so, we don't resend this Header when publishing Event, it will still be available for us:
When publishing Events from Aggregates or Sagas, metadata will be propagated automatically too.
When using Messaging we may want to be able to trace where given Message came from, who was the parent how it was correlated with other Messages. In Ecotone all Messages contains of Message Id, Correlation Id and Parent Id within Metadata. Those are automatically assigned and propagated by the Framework, so from application level code we don’t need to deal manage those Messaging level Concepts.
Using Message Id, Correlation Id are especially useful find out what have happened during the flow and if any part of the flow has failed. Using already propagated Headers, we may build our own tracing solution on top of what Ecotone provides or use inbuilt support for .
Each Message receives it's own unique Id, which is Uuid generated value. This is used by Ecotone to provide capabilities like , and Message identification for .
"parentId" header refers to Message that was direct ancestor of it. In our case that can be correlation of Command and Event. As a result of sending an Command, we publish an Event Message.
Parent id will always refer to the previous Message. What is important however is that, if we have multiple of the Message with same Id.
Correlation Id is useful for longer flows, which can span over multiple Message Handlers. In those situations we may be interested in how our Message flow have branched:
PHP rebuild and delete projections
As projection can be restarted, deleted and created differently. When the projection knows how to setup it itself, it's easy to rebuild it when change is needed.
And inside the projection we need to implement ProjectionInitialization to tell Ecotone what to do:
In order to restart the projection in case we want to provide incompatible change, we can simply reset the projection and it will build up from the beginning.
And inside the projection we need to implement ProjectionReset to tell Ecotone what to do:
If we want to delete the projection
And inside the projection we need to implement ProjectionDelete to tell Ecotone what to do:
If we want to manually trigger projection
There are two ways we can configure our Aggregate to record Events.
This way of handling events does allow for pure functions. This means that actions called on the Aggregate returns Events and are not changing internal state of Aggregate.
EventSourcingAggregate and Identifier .
Event Sourced Aggregate must provide version. You may leave it to Ecotone using WithAggregateVersioning or you can implement it yourself.
CommandHandlerfor event sourcing returns events generated by specific method. This will be passed to the
This way of handling events allow for similarity with . This convention requires changing internal state of Aggregate to record Events. Therefore Pure ES Aggregate is recommended as it's not require for any internal state changes in most of the scenarios. However ES Aggregate with Internal Recorder may be useful for projects migrating with other solutions, or when our team is heavily used to working this way.
In order to make use of alternative way of handling events, we need to provide trait WithEvents.
Command Handlers instead of returning events are acting the same as .
All events which will be published using recordThatwill be passed to the to be stored.
In case of Ecotone we don't prioritize whole Message, but specific Message Handler. This helps in scenarios when we have multiple Event Handlers, and we would like to configure each of them differently. For example may we have a case, where we want to prioritized one notification over another.
In Ecotone the higher the priority, the quicker given Event Handler will be called.
In case we publish an Event, we may want have multiple subscribing Event Handlers. In some situations we may want given action to happen before the other. This may happen for example for example, when one Event Handler updates an data, which the other is using. Therefore we may need to ensure that the Handler modifying data will be called before the one that make use of it.
There may be cases when we will use Synchronous Projections and then we would like to use this data in Event Handler. In that case, if standard Event Handler would be called first, we would lack the data. Therefore by default when Standard Event Handler has same priority as Projection Event Handler, Projection will be called first.
This also works for Aggregates to be prioritized before Standard Event Handlers. Therefore the ordering when same priority level is used, is as follows:
Projection Event Handlers
Aggregate/Sagas Event Handlers
Standard Event Handlers
Ecotone does allow to prioritize an handling of given Message before another one. This way we can handle quicker Messages that have been published to Asynchronous Message Channel later. For example we may send a lot of different notifications, however when Customer asks for One-Time Password we want to deliver it immediately. Therefore for this scenario we would setup higher priority for Authentication Token notification handling, than for other Notifications.
You may prioritize handling given asynchronous message by adding #[Priority] attribute.
We may send an Message and tell Ecotone to prioritize it using priority Message Header:
Ecotone comes with full support for managing full life cycle of a error message by using Dbal Module.
Store failed Message with all details about the exception
Allow for reviewing error Messages
Allow for deleting and replaying error Message back to the
To make use of Dead Letter, we need to have installed.
If we configure default error channel to point to "dbal_dead_letter" then all Error Messages will land there directly
config/packages/ecotone.yaml
config/ecotone.php
We may also want to try to recover before we consider Message to be stored in Dead Letter:
config/packages/ecotone.yaml
config/ecotone.php
and then we use inbuilt Retry Strategy:
Get more details about existing commands
Listing current error messages
Get more details about given error message
Replay error message. It will return to previous channel for consumer to pick it up and handle again.
Replaying all the error messages.
Delete given error message
The above solution requires running Console Line Commands. If we want however, we can manage all our Error Messages from one place using .
This is especially useful when we've multiple Applications, so we can go to single place and see if any Application have failed to process Message.
To ensure full level of data consistency, we may decide to store messages along side with data changes. This way we work only with single data storage, avoiding completely problem with persisting Message in two sources at once. To make it happen Ecotone implements so called Outbox pattern.
When loading Aggregates or Sagas we need to know what Identifier should be used for that. This depending on the business feature we work may require different approaches. In this section we will dive into different solutions which we can use.
Ecotone resolves the mapping automatically, when Identifier in the Aggregate/Saga is named the same as the property in the Command/Event.
then, if Message has productId, it will be used for Mapping:
Whenever we use more than one storage during single action, storing to first storage may end up with success, yet the second may not. This can happen when we store data in database and then send Messages to Message Broker. If failure happen it can be that we will send some Message to Broker, yet fail to store related data or vice versa. Ecotone provide you with tools to help solve this problem in order to make sending Messages to Message Broker resilient.
Ecotone by default enables Message Collector. Collector collect messages that are about to be send to asynchronous channels in order to send them just before the transaction is committed. This way it help avoids bellow pitfalls:
class OrderWasPlaced
{
private string $orderId;
private string $productName;
public function __construct(string $orderId, string $productName)
{
$this->orderId = $orderId;
$this->productName = $productName;
}
public function getOrderId(): string
{
return $this->orderId;
}
public function getProductName(): string
{
return $this->productName;
}
}class NotificationService
{
const ASYNCHRONOUS_MESSAGES = "asynchronous_messages";
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
echo "Handling asynchronously: " . $event->getProductName() . "\n";
}
}class Configuration
{
#[ServiceContext]
public function enableRabbitMQ()
{
return AmqpBackedMessageChannelBuilder::create(NotificationService::ASYNCHRONOUS_MESSAGES);
}
}$eventBus->publish(new OrderWasPlaced(1, "Milk"));
# Running asynchronous consumer
$messagingSystem->run("asynchronous_messages");$this->commandBus->send(new AssignPerson(1000, 12), metadata: [
'executorId' => '123
]);#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
(...)
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
return [new PersonWasAssigned($this->ticketId, $command->personId)];
}
}public function handle(
PersonWasAssigned $event,
// Accessing Metadata
#[Header("executorId")] $executorId
): void
{
// do something with metadata
};#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $type;
(...)
#[CommandHandler]
public function assign(AssignPerson $command) : array
{
return [
Event::create(
new PersonWasAssigned($this->ticketId, $command->personId),
[
'ticketType' => $this->ticketType
]
)
];
}
}public function handle(
PersonWasAssigned $event,
// Accessing Metadata
#[Header("ticketType")] $ticketType
): void
{
// do something with metadata
};#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $ownerId;
(...)
#[CommandHandler]
public function change(
ChangeTicket $command,
// Accessing Metadata
#[Header("executorId")] $executorId
) : array
{
// do something with executorId
}#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
private string $ownerId;
(...)
#[CommandHandler]
public function change(ChangeTicket $command, #[Header] $executorId) : array
{
if ($this->ownerId !== $executorId) {
throw new \InvalidArgumentException("Only owner can change Ticket");
}
return new TicketChanged($this->ticketId, $command->type);
}
#[EventSourcingHandler]
public function applyTicketCreated(
TicketCreated $event,
// Accessing Metadata
#[Header("executorId")] $executorId,
) : void
{
$this->id = $event->id;
$this->ownerId = $executorId;
}
}class PlaceOrder
{
private string $orderId;
private string $productName;
public function __construct(string $orderId, string $productName)
{
$this->orderId = $orderId;
$this->productName = $productName;
}
public function getOrderId(): string
{
return $this->orderId;
}
public function getProductName(): string
{
return $this->productName;
}
}use Ecotone\Modelling\Attribute\CommandHandler;
class OrderService
{
private array $orders;
#[CommandHandler]
public function placeOrder(PlaceOrder $command) : void
{
$this->orders[$command->getOrderId()] = $command->getProductName();
}
}class GetOrder
{
private string $orderId;
public function __construct(string $orderId)
{
$this->orderId = $orderId;
}
public function getOrderId(): string
{
return $this->orderId;
}
}use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
class OrderService
{
private array $orders;
#[CommandHandler]
public function placeOrder(PlaceOrder $command) : void
{
$this->orders[$command->getOrderId()] = $command->getProductName();
}
#[QueryHandler]
public function getOrder(GetOrder $query) : string
{
if (!array_key_exists($query->getOrderId(), $this->orders)) {
throw new \InvalidArgumentException("Order was not found " . $query->getOrderId());
}
return $this->orders[$query->getOrderId()];
}
}$commandBus->send(new PlaceOrder(1, "Milk"));
echo $queryBus->send(new GetOrder(1));interface PersonApi
{
#[DbalWrite("INSERT INTO persons VALUES (:personId, :name)")]
public function register(int $personId, string $name): void;
}class NotificationService
{
#[Scheduled(endpointId: "notificationSender")]
#[Poller(fixedRateInMilliseconds: 1000)]
public function sendNotifications(): void
{
echo "Sending notifications...\n";
}
}$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorUsername" => $security->getUser()->getUsername()]
);#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Header("executorUsername")] string $executor
) {
// handle closing ticket
} final class UserRegisteredSubscriber
{
#[Priority(5)]
#[EventHandler]
public function updateSomeData(UserWasRegistered $event): void
{
// updating data will be called first
}
#[Priority(1)]
#[EventHandler]
public function sendNotification(UserWasRegistered $event): void
{
// sending notification will be called second
}
}

CommandHandler defined on non static class method is place where you would make changes to existing aggregate, fetched from repository.
bin/console ecotone:run basket_list -vvvartisan ecotone:run basket_list -vvv$messagingSystem->run("basket_list");console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| notificationSender |
+--------------------+artisan ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| notificationSender |
+--------------------+$consumers = $messagingSystem->list()



artisan ecotone:es:initialize-projectionn {projectionName}$messagingSystem->runConsoleCommand("ecotone:es:initialize-projection", ["name" => $projectionName]);bin/console ecotone:es:initialize-projection {projectionName}EventSourcingHandler is method responsible for reconstructing Aggregate from previously created events. At least one event need to be handled in order to provide Identifier.


In order to use Outbox pattern we need to set up Dbal Module.
By sending asynchronous messages via database, we are storing them together with data changes. This thanks to default transactions for Command Handlers, commits them together.
After this all your messages will be go through your database as a message channel.
With Ecotone's Outbox pattern we set up given Channel to run via Database. This means that we can target specific channels, that are crucial to run under outbox pattern. In other cases where data consistency is not so important to us, we may actually use Message Broker Channels directly and skip the Outbox. As an example, registering payments and payouts may an crucial action in our system, so we use it with Outbox pattern. However sending an "Welcome" notification may be just fine to run directly with Message Broker.
One of the challenges of implementing Outbox pattern is way to scale it. When we start consume a lot of messages, we may need to run more consumers in order to handle the load.
In case of Ecotone, you may safely scale your Messages Consumers that are consuming from your Dbal Message Channel. Each message will be reserved for the time of being published, thanks to that no duplicates will be sent when we scale.
However we may actually want to avoid scaling our Dbal based Message Consumers to avoid increasing the load on the database.
For this situation Ecotone allows to make use so called Combined Message Channels.
In that case we would run Database Channel only for the outbox and for actual Message Handler execution a different one.
This is powerful concept, as we may safely produce messages with outbox and yet be able to handle and scale via RabbitMQ SQS Redis etc.
database_channel is Dbal Message Channel
rabbit_channel is our RabbitMQ Message Channel
Then we run one or few Message Consumers for outbox and we scale Message Consumers for rabbit.
If we want more convient way as we would like to apply combined message channels on multiple Message Handlers, we may create an reference.
And then we use reference for our Message Handlers.
You may use multiple aggregate identifiers or identifier as objects (e.g. Uuid) as long as they provide __toString method
We may also expose identifier over public method by annotating it with attribute IdentifierMethod("productId").
If the property name is different than Identifier in the Aggregate/Saga, we need to give Ecotone a hint, how to correlate identifiers.
We can do that using TargetIdentifier attribute, which states to which Identifier given property references too:
When there is no property to correlate inside Command or Event, we can use Identifier from Metadata.
When we've the identifier inside Metadata then we can use identifierMetadataMapping.
Suppose the orderId identifier is available in metadata under key orderNumber, then we can then use this mapping:
We can make use of Before or Presend Interceptors to enrich event's metadata with required identifiers.
We may provide Identifier dynamically using Command Bus. This way we can state explicitly what Aggregate/Saga instance we do refer too. Thanks to we don't need to define Identifier inside the Command and we can skip any kind of mapping.
In some scenario we won't be in deal to create an Command class at all. For example we may provide block user action, which changes the status:
Event so we are using "aggregate.id" in the metadata, this will work exactly the same for Sagas. Therefore if we want to trigger Message Handler on the Saga, we can use "aggregate.id" too.
There may be cases where more advanced mapping may be needed. In those cases we can use identifier mapping based on Expression Language.
When using identifierMapping configuration, we get access to the Message fully and to Dependency Container. To access specific part we will be using:
payload -> Represents our Event/Command class
headers -> Represents our Message's metadata
reference('name') -> Allow to access given service from our Dependency Container
Suppose the orderId identifier is available in metadata under key orderNumber, then we can tell Message Handler to use this mapping:
Suppose our Identifier is an Email object within Command class and we would like to normalize before it's used for fetching the Aggregate/Saga:
Suppose we receive external order id, however we do have in database our internal order id that should be used as Identifier. We could then have a Service registered in DI under "orderIdExchange":
Then we can make use of it in our identifier Mapping
Deduplication is enabled by default and works whenever message is consumed in asynchronous way.
You may also define given Message Handler for deduplication. This will use Message Headers and deduplicated base on your customer header key. This allows in synchronous and asynchronous scenarios.
This is especially useful when, we receive events from external services e.g. payment or notification events which contains of identifier that we may use deduplication on. For example Sendgrid (Email Service) sending us notifications about user interaction, as there is no guarantee that we will receive same webhook once, we may use "eventId", to deduplicate in case.
paymentId becomes our deduplication key. Whenever we will receive now Command with same value under paymentId header, Ecotone will deduplicate that and skip execution of receivePayment method.
Deduplication happen across given endpointId.This means that if we would introduce another handler with same deduplication key, it will get it's own deduplication tracking.
As deduplication is tracked within given endpoint id, it means we can change the deduplication key safely without being in risk of receiving duplicates. If we would like to start tracking from fresh, it would be enough to change the endpointId.
We can also dynamically resolve deduplicate value, for this we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate mapping:
To reuse same deduplication mechanism across different Message Handlers we may want to decide to use Deduplication on the level of Command Bus. For this, it's enough to extend Command Bus interface with out custom one:
Then all Commands sent over this Command Bus will be deduplicated using "paymentId" header.
This feature is available as part of Ecotone Enterprise.
By default using same deduplication key between Command Buses, will mean that Message will be discarded. If we want to ensure isolation that each Command Bus is tracking his deduplication separately, we can add tracking name:
To remove expired deduplication history which is kept in database table, Ecotone provides an console command:
By default Ecotone removes message id from deduplication storage after 7 days in batches of 1000. It can be customized in case of need:
It's important to keep removal batch size at small number. As deleting records may result in database index rebuild which will cause locking. Therefore small batch size will ensure our system can continue, while messages are being deleted in background.
As the deduplication is enabled by default, if you want to disable it then make use of DbalConfiguration.
sendQueryBusJust like with Commands, we may use routing in order to execute queries:
To send an Query we will be using sendWithRouting method on QueryBus.
Query will be delivered to corresponding Query Handler.
If you have registered Converter for specific Media Type, then you can tell Ecotone to convert result of your Query Bus to specific format.
In order to do this, we need to make use of Metadataand replyContentType header.
Message Collector is enabled by default. It works whenever messages are sent via Command Bus or when message are consumed asynchronously.
Let's consider example scenario: During order processing, we publish an OrderWasPlaced event, yet we fail to store Order in the database. This means we've published Message that is based on not existing data, which of course will create inconsistency in our system.
When Message Collector is enabled it provides much higher assurance that Messages will be send to Message Broker only when your flow have been successful.
Let's consider example scenario: During order processing, we may publish an OrderWasPlaced event, yet it when we publish it right away, this Message could be consumed and handled before Order is actually committed to the database. In such situations consumer will fail due to lack of data or may produce incorrect results.
Due to Message Collector we gracefully reduce chance of this happening.
In general sending Messages to external broker is composed of three stages:
Serialize Message Payload
Map and prepare Message Headers
Send Message to external Broker
In most of the frameworks those three steps are done together, which may create an issue. Let's consider example scenario: We send multiple Messages, the first one may with success and the second fail on serialization. Due to that transaction will be rolled back, yet we already produced the first Message, which becomes an Ghost Message. To avoid that Ecotone perform first two actions first, then collect all Messages and as a final step iterate over collected Messages and sent them. This way Ecotone ensures that all Messages must have valid serialization before we actually try to send any of them.
As Collector keeps the Messages in memory till the moment they are sent, in case of sending a lot of messages you may consider turning off Message Collector, to avoid memory consumption. This way Messages will be sent instantly to your Message Broker.
Whenever sending to Message Broker fails, Ecotone will retry in order to self-heal the application.
By default Ecotone will do 2 reties when sending to Message Channel fails:
- First after 10ms
- Second after 400ms.
You may configure sending retries per asynchronous channel:
After exhausting limit of retries in order to send the Message to the Broker, we know that we won't be able to do this. In this scenario instead of letting our action fail completely, we may decide to push it to Error Channel instead of original targetted channel.
We may decide for example to push it to Dead Letter to store it and later retry:
If you will push Error Messages to Dbal Dead Letter, then they will be stored in your database for later review. You may then delete or replay them after fixing the problem. This way we ensure consistency even if unrecoverable failure happened our system continues to have self-healed.
If you need customization per Message Consumer you may do it using PollableChannelConfiguration by providing Message Consumer name:
For mission critical scenarios, you may consider using Ecotone's Outbox Pattern.
#[Projection("basketList", Basket::class)]
class BasketList
{
#[EventHandler]
public function addBasket(BasketWasCreated $event) : void
{
// do something
}
}#[Projection("log_projection", [Ticket::class, Basket::class])]
class Logger#[Projection("category_projection", fromCategories: Ticket::class)]
class FromCategoryUsingAggregatePerStreamProjection#[NamedEvent("basket.was_created")]
class BasketWasCreated
{
public const EVENT_NAME = "basket.was_created";
private string $id;
public function __construct(string $id)
{
$this->id = $id;
}
public function getId(): string
{
return $this->id;
}
}#[Projection("basket_list")]
class BasketList
{
#[EventHandler("basket.was_created")]
public function addBasket(BasketWasCreated $event) : void
{
// do something with $event
} #[EventHandler("basket.was_created")]
public function addBasket(array $event) : void
{
// do something with $event
}$product = $this->repository->getById($command->id());
$product->changePrice($command->getPriceAmount());
$this->repository->save($product); #[Aggregate]
class Product
{
#[Identifier]
private string $productId;class ChangePriceCommand
{
private string $productId; // same property name as Aggregate's Identifier
private Money $priceAmount; #[Aggregate] // 1
class Product
{
#[Identifier] // 2
private string $productId;
private string $name;
private integer $priceAmount;
private function __construct(string $orderId, string $name, int $priceAmount)
{
$this->productId = $orderId;
$this->name = $name;
$this->priceAmount = $priceAmount;
}
#[CommandHandler] //3
public static function register(RegisterProductCommand $command) : self
{
return new self(
$command->getProductId(),
$command->getName(),
$command->getPriceAmount()
);
}
#[CommandHandler] // 4
public function changePrice(ChangePriceCommand $command) : void
{
$this->priceAmount = $command->getPriceAmount();
}
}#[ServiceContext]
public function basketList()
{
return ProjectionRunningConfiguration::createPolling("basket_list");
}#[Asynchronous("asynchronous_projections")]
#[Projection("basket_list")]
class BasketListclass ProjectionConfiguration
{
#[ServiceContext]
public function configureEventSourcing()
{
return [
EventSourcingConfiguration::createWithDefaults()
->withEventStreamTableName('event_stream') // name of event stream table
->withProjectionsTableName('projections') // name of projection table
->withInitializeEventStoreOnStart(false) // will check and create above tables if needed
->withLoadBatchSize(1) // amount of events loaded on each projection run,
];
}
}
#[ServiceContext]
public function projectionConfiguration()
{
return ProjectionRunningConfiguration::createPolling("ticket_list")
->withOption("load_count", 10000);
}interface OrderRepository
{
#[Repository]
public function getOrder(string $twitId): Order;
#[Repository]
public function findOrder(string $twitId): ?Order;
#[Repository]
public function save(Twitter $twitter): void;
}interface OrderRepository
{
#[Repository]
public function getOrder(string $twitId): Order;
#[Repository]
public function findOrder(string $twitId): ?Order;
#[Repository]
#[RelatedAggregate(Order::class)]
public function save(string $aggregateId, int $currentVersion, array $events): void;
}#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("payload.userId")] User $user
): void {
// do something
}class readonly PlaceOrder
{
public function __construct(
public string $orderId,
public string $userId,
public string $productId
) {
}#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("payload.userId")] ?User $user // we marked it as possible null
): void {
// do something
}#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("headers['userId']")] User $user
): void {
// do something
}#[CommandHandler]
public function placeOrder(
PlaceOrder $command,
#[Fetch("reference('emailToIdMapper').map(payload.email)")] User $user
): void {
// do something
}$eventStore->create($streamName, streamMetadata: [
"_persistence" => 'simple',
]);
$eventStore->appendTo(
$streamName,
[
Event::create(
payload: new TicketWasRegistered('123', 'Johnny', 'alert'),
metadata: [
'executor' => 'johnny',
]
)
]
);$eventStore->create($streamName, streamMetadata: [
"_persistence" => 'partition',
]);$eventStore->appendTo(
$streamName,
[
Event::create(
new TicketWasRegistered('123', 'Johnny', 'alert'),
[
'_aggregate_id' => 123,
'_aggregate_version' => 1,
'_aggregate_type' => 'ticket',
]
)
]
);$eventStore->create($streamName, streamMetadata: [
"_persistence" => 'aggregate',
]);$eventStore->appendTo(
$streamName,
[
Event::create(
new TicketWasRegistered('123', 'Johnny', 'alert'),
[
'_aggregate_id' => 123,
'_aggregate_version' => 1,
'_aggregate_type' => 'ticket',
]
)
]
);#[ServiceContext]
public function aggregateStreamStrategy()
{
return EventSourcingConfiguration::createWithDefaults()
->withCustomPersistenceStrategy(new CustomStreamStrategy(new FromProophMessageToArrayConverter()));
}#[ServiceContext]
public function persistenceStrategy()
{
return EventSourcingConfiguration::createWithDefaults()
->withSimpleStreamPersistenceStrategy();
}#[ServiceContext]
public function eventSourcingConfiguration(): EventSourcingConfiguration
{
return EventSourcingConfiguration::createWithDefaults()
->withPersistenceStrategyFor('some_stream', LazyProophEventStore::AGGREGATE_STREAM_PERSISTENCE)
;
}#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function register(
#[DbalParameter(name: 'personId')] int $id,
string $name
): void;#[DbalWrite('UPDATE persons SET name = :name WHERE person_id = :personId')]
public function changeName(int $personId, string $name): int;interface PersonApi
{
#[DbalQueryMethod('SELECT person_id, name FROM persons LIMIT :limit OFFSET :offset')]
public function getNameList(int $limit, int $offset): array;
}/**
* @return int[]
*/
#[DbalQuery(
'SELECT person_id FROM persons ORDER BY person_id ASC LIMIT :limit OFFSET :offset',
fetchMode: FetchMode::FIRST_COLUMN
)]
public function getPersonIds(int $limit, int $offset): array;#[DbalQuery(
'SELECT COUNT(*) FROM persons',
fetchMode: FetchMode::FIRST_COLUMN_OF_FIRST_ROW
)]
public function countPersons(): int;#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTO(int $personId): array;#[DbalQuery(
'SELECT person_id, name FROM persons WHERE person_id = :personId',
fetchMode: FetchMode::FIRST_ROW
)]
public function getNameDTOOrNull(int $personId): PersonNameDTO|null;#[DbalQuery(
'SELECT person_id, name FROM persons ORDER BY person_id ASC',
fetchMode: FetchMode::ITERATE
)]
public function getPersonIdsIterator(): iterable;#[DbalQuery('SELECT * FROM persons WHERE person_id IN (:personIds)')]
public function getPersonsWith(
#[DbalParameter(type: ArrayParameterType::INTEGER)] array $personIds
): array;interface EventSourcedRepository
{
1. public function canHandle(string $aggregateClassName): bool;
2. public function findBy(string $aggregateClassName, array $identifiers, int $fromAggregateVersion = 1) : EventStream;
3. public function save(array $identifiers, string $aggregateClassName, array $events, array $metadata, int $versionBeforeHandling): void;
}#[Repository]
class CustomEventSourcingRepositorypublic function save(
array $identifiers,
string $aggregateClassName,
array $events,
array $metadata,
int $versionBeforeHandling
): void;class Event
{
private function __construct(
private string $eventName, // either class name or name of the event
private object $payload, // event object instance
private array $metadata // related metadata
)
}#[EventSourcingAggregate]
#[AggregateType("basket")]
class Basket#[NamedEvent("order_was_placed")]
class OrderWasPlaced#[ServiceContext]
public function configuration()
{
return BaseEventSourcingConfiguration::withDefaults()
->withSnapshotsFor(Basket::class, thresholdTrigger: 100);
}public function findBy(
string $aggregateClassName,
array $identifiers,
int $fromAggregateVersion = 1
) $repository = new CustomEventSourcingRepository;
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[OrderAggregate::class, CustomEventSourcingRepository::class],
[CustomEventSourcingRepository::class => $repository],
addInMemoryEventSourcedRepository: false,
);
$ecotoneLite->sendCommand(new PlaceOrder());
$this->assertNotEmpty($repository->getEvents());console ecotone:run notificationSender -vvvartisan ecotone:run notificationSender -vvv$messagingSystem->run("notificationSender");class CurrencyExchanger
{
#[Scheduled(requestChannelName: "exchange", endpointId: "currencyExchanger")]
#[Poller(fixedRateInMilliseconds=1000)]
public function callExchange() : array
{
return ["currency" => "EUR", "ratio" => 1.23];
}
}
#[CommandHandler("exchange")]
public function exchange(ExchangeCommand $command) : void;class CurrencyExchanger
{
#[Scheduled(requestChannelName: "exchange", endpointId: "currencyExchanger")]
#[Poller(fixedRateExpression="reference('timerService').getFixedRate()")]
public function callExchange() : array
{
return ["currency" => "EUR", "ratio" => 1.23];
}
}class UsernameConverter
{
#[Converter]
public function from(string $data): Username
{
return Username::fromString($data);
}
}#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Header("executorUsername")] Username $executor
) {
// handle closing ticket
}$this->commandBus->send(
new BlockerUser($request->get("userId")),
["executorId" => $security->getUser()->getCurrentId()]
);#[CommandHandler]
public function closeTicket(BlockerUser $command, EventBus $eventBus) {
// handle blocking user
$eventBus->publish(new UserWasBlocked($command->id));
}#[EventHandler]
public function closeTicket(
UserWasBlocked $command,
#[Header('executorId')] $executorId
) {
// handle storing audit data
}#[ProjectionInitialization]
public function initialization() : void
{
$this->connection->executeStatement(<<<SQL
CREATE TABLE IF NOT EXISTS in_progress_tickets (
ticket_id VARCHAR(36) PRIMARY KEY,
ticket_type VARCHAR(25)
)
SQL);
}bin/console ecotone:es:reset-projection {projectionName}artisan ecotone:es:reset-projection {projectionName}$messagingSystem->runConsoleCommand("ecotone:es:reset-projection", ["name" => $projectionName]);#[ProjectionReset]
public function reset() : void
{
$this->connection->executeStatement(<<<SQL
DELETE FROM in_progress_tickets
SQL);
}bin/console ecotone:es:delete-projection {projectionName}artisan ecotone:es:delete-projection {projectionName}$messagingSystem->runConsoleCommand("ecotone:es:delete-projection", ["name" => $projectionName]);#[ProjectionDelete]
public function delete() : void
{
$this->connection->executeStatement(<<<SQL
DROP TABLE in_progress_tickets
SQL);
}bin/console ecotone:es:trigger-projection {projectionName}artisan ecotone:es:trigger-projection {projectionName}$messagingSystem->runConsoleCommand("ecotone:es:trigger-projection", ["name" => $projectionName]);#[EventSourcingAggregate] // 1
class Ticket
{
use WithAggregateVersioning; // 2
#[Identifier] // 1
private string $ticketId;
private string $ticketType;
#[CommandHandler] // 2
public static function register(RegisterTicket $command) : array
{
return [new TicketWasRegistered($command->getTicketId(), $command->getTicketType())];
}
#[CommandHandler] // 2
public function close(CloseTicket $command) : array
{
return [new TicketWasClosed($this->ticketId)];
}
#[EventSourcingHandler] // 4
public function applyTicketWasRegistered(TicketWasRegistered $event) : void
{
$this->ticketId = $event->getTicketId();
$this->ticketType = $event->getTicketType();
}
}#[EventSourcingAggregate]
class Basket
{
use WithEvents; // 1
use WithVersioning;
#[Identifier]
private string $id;
#[CommandHandler] // 2
public static function create(CreateBasket $command) : static
{
$basket = new static();
$basket->recordThat(new BasketWasCreated($command->getId()));
return $basket;
}
#[CommandHandler] // 2
public function addProduct(AddProduct $command) : void
{
$this->recordThat(new ProductWasAddedToBasket($this->id, $command->getProductName()));
}
#[EventSourcingHandler]
public function applyBasketWasCreated(BasketWasCreated $basketWasCreated)
{
$this->id = $basketWasCreated->getId();
}
}#[Projection]
final class UserProjection
{
#[EventHandler]
public function handle(UserWasRegistered $event): void
{
// Projection will be called first
}
}
final class UserRegisteredSubscriber
{
#[EventHandler]
public function sendNotification(UserWasRegistered $event): void
{
// sending notification will be called second
}
}#[Priority(1)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification with lower priority
}
#[Priority(5)]
#[Asynchronous("notifications")]
#[CommandHandler(endpointId: "authenticationToken")]
public function sendAuthenticationToken(RequestToken $command): void
{
// send authentication token quicker
}$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["priority" => 100]
);ecotone:
defaultErrorChannel: "dbal_dead_letter"return [
'defaultErrorChannel' => 'dbal_dead_letter',
];$ecotone = EcotoneLite::bootstrap(
configuration: ServiceConfiguration::createWithDefaults()
->withDefaultErrorChannel('dbal_dead_letter')
);ecotone:
defaultErrorChannel: "errorChannel"return [
'defaultErrorChannel' => 'errorChannel',
];$ecotone = EcotoneLite::bootstrap(
configuration: ServiceConfiguration::createWithDefaults()
->withDefaultErrorChannel('errorChannel')
);#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
// your retry strategy
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
// if retry strategy will not recover, then send here
"dbal_dead_letter"
);
}bin/console ecotone:deadletter:helpartisan ecotone:deadletter:helpbin/console ecotone:deadletter:listartisan ecotone:deadletter:list$list = $messagingSystem->runConsoleCommand("ecotone:deadletter:list", []);bin/console ecotone:deadletter:show {messageId}artisan ecotone:deadletter:show {messageId}$details = $messagingSystem->runConsoleCommand("ecotone:deadletter:show", ["messageId" => $messageId]);bin/console ecotone:deadletter:replay {messageId}artisan ecotone:deadletter:replay {messageId}$messagingSystem->runConsoleCommand("ecotone:deadletter:replay", ["messageId" => $messageId]);bin/console ecotone:deadletter:replayAllartisan ecotone:deadletter:replayAll$messagingSystem->runConsoleCommand("ecotone:deadletter:replayAll", []);bin/console ecotone:deadletter:delete {messageId}artisan ecotone:deadletter:delete {messageId}$messagingSystem->runConsoleCommand("ecotone:deadletter:delete", ["messageId" => $messageId]);#[ServiceContext]
public function dbalConfiguration()
{
return DbalConfiguration::createWithDefaults()
->withDeadLetter(false);
}#[ServiceContext]
public function databaseChannel()
{
return DbalBackedMessageChannelBuilder::create("async");
}#[Asynchronous("async")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
// notify about new order
}#[Asynchronous(["database_channel", "rabbit_channel"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
}#[ServiceContext]
public function combinedMessageChannel(): CombinedMessageChannel
{
return CombinedMessageChannel::create(
'outbox_sqs', //Reference name
['database_channel', 'amazon_sqs_channel'], // list of combined message channels
);
}#[Asynchronous(["outbox_sqs"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
} #[Aggregate]
class Product
{
#[Identifier]
private string $productId;class ChangePriceCommand
{
private string $productId;
private Money $priceAmount;
}#[Aggregate]
class Product
{
private string $id;
#[IdentifierMethod("productId")]
public function getProductId(): string
{
return $this->id;
}class SomeEvent
{
#[TargetIdentifier("orderId")]
private string $purchaseId;
}#[EventHandler(identifierMetadataMapping: ["orderId" => "orderNumber"])]
public function failPayment(PaymentWasFailedEvent $event, CommandBus $commandBus) : self
{
// do something with $event
}$this->commandBus->sendWithRouting('user.block', metadata:
'aggregate.id' => $userId // This way we provide dynamic identifier
])#[CommandHandler('user.block')]
public function block() : void
{
$this->status = 'blocked';
}#[EventHandler(identifierMapping: ["orderId" => "headers['orderNumber']"])]
public function failPayment(PaymentWasFailedEvent $event, CommandBus $commandBus) : void
{
// do something with $event
}class BlockUser
{
private Email $email;
(...)
public function getEmail(): Email
{
return $this->email;
}
}#[CommandHandler(identifierMapping: [
"email" => "payload.getEmail().normalize()"]
)]
public function block(BlockUser $command) : void
{
// do something with $command
}class OrderIdExchange
{
public function exchange(string $externalOrderId): string
{
// do the mapping
return $internalOrderId;
}
}#[EventHandler(identifierMapping: [
"orderId" => "reference('orderIdExchange').exchange(payload.externalOrderId())"
])]
public function when(OrderCancelled $event) : void
{
// do something with $event
}bin/console ecotone:deduplication:remove-expired-messagesartisan ecotone:deduplication:remove-expired-messages$messagingSystem->runConsoleCommand("ecotone:deduplication:remove-expired-messages");$this->commandBus->send($command, metadata: ["paymentId" => $paymentId]);final class PaymentHandler
{
#[Deduplicated('paymentId')]
#[CommandHandler(endpointId: "receivePaymentEndpoint")]
public function receivePayment(ReceivePayment $command): void
{
// handle
}
}final class PaymentHandler
{
#[Deduplicated('paymentId')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}final class PaymentHandler
{
#[Deduplicated(expression: 'payload.paymentId')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}final class PaymentHandler
{
#[Deduplicated(expression: 'reference("paymentIdMapper").map(payload.paymentId)')]
#[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
public function receivePaymentChanges(ReceivePayment $command): void
{
// handle
}
}#[Deduplicated(expression: "headers['paymentId']")]
interface PaymentCommandBus extends CommandBus
{
}#[Deduplicated("paymentId", trackingName: 'payment_tracker']]
interface PaymentCommandBus extends CommandBus
{
}class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
// 100000 ms - 100 seconds
->withDeduplication(
expirationTime: 100000,
removalBatchSize: 1000
);
}
}class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDeduplication(false);
}
}class TicketController
{
// Query Bus will be auto registered in Depedency Container.
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->send(
new GetTicketById(
$request->get("ticketId")
)
);
return new Response(\json_encode($result));
}
}$ticket = $messagingSystem->getQueryBus()->send(
new GetTicketById(
$ticketId
)
);class TicketController
{
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->sendWithRouting(
"ticket.getById",
$request->get("ticketId")
);
return new Response(\json_encode($result));
}
}$ticket = $messagingSystem->getQueryBus()->sendWithRouting(
"ticket.getById",
$ticketId
);class TicketController
{
public function __construct(private QueryBus $queryBus) {}
public function createTicketAction(Request $request) : Response
{
$result = $this->queryBus->sendWithRouting(
"ticket.getById",
$request->get("ticketId"),
// Tell Ecotone which format you want in return
expectedReturnedMediaType: "application/json"
);
return new Response($result);
}
}$ticket = $messagingSystem->getQueryBus()->sendWithRouting(
"ticket.getById",
$ticketId,
expectedReturnedMediaType: "application/json"
);class TicketService
{
#[QueryHandler]
public function getTicket(GetTicketById $query) : array
{
//return ticket
}
}class readonly GetTicketById
{
public function __construct(
public string $ticketId
) {}
}class TicketService
{
#[QueryHandler("ticket.getById")]
public function getTicket(string $ticketId) : array
{
//return ticket
}
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withCollector(false);
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::create(
RetryTemplateBuilder::exponentialBackoff(initialDelay: 10, multiplier: 2)
->maxRetryAttempts(3)
->build()
);
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("failure_channel")
}
---
#[ServiceActivator('failure_channel')]
public function doSomething(ErrorMessage $errorMessage): void
{
// Handle failure message on your own terms :)
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}#[ServiceContext]
public function asyncChannelConfiguration()
{
return PollableChannelConfiguration::createWithDefaults('notifications')
->withCollector(false)
->withErrorChannel("dbal_dead_letter")
}We may want to use higher level object within our Interface than simple scalar types. As those can't be understood by our Database, it means we need Conversion. Ecotone provides default conversions and possibility to customize the process.
Ecotone provides inbuilt Conversion for Date Time based objects.
By default Ecotone will convert time using Y-m-d H:i:s.u format. We may override this using Custom Converters.
If your Class contains __toString method, it will be used for doing conversion.
We may override this using .
For example database column may be of type JSON or Binary. In those situation we may state what Media Type given parameter should be converted too, and Ecotone will do the conversion before it's executing SQL.
In above example roles will be converted to JSON before SQL will be executed.
If we are using higher level classes like Value Objects, we will be able to change the type to expected one. For example if we are using we can register Converter for our PersonRole Class and convert it to JSON or XML.
Then we will be able to use our Business Method with PersonRole, which will be converted to given Media Type before being saved:
This way we can provide higher level classes, keeping our Interface as close as it's needed to our business model.
We may use Expression Language to dynamically evaluate our parameter.
payload is special parameter in expression, which targets value of given parameter, in this example it will be PersonName. In above example before storing name in database, we will call toLowerCase() method on it.
We may also access any Service from our Dependency Container and run a method on it.
reference is special function within expression which allows us to fetch given Service from Dependency Container. In our case we've fetched Service registered under "converter" id and ran normalize method passing PersonName.
We may use Dbal Parameters on the Method Level, when parameter is not needed.
In case parameter is a static value.
We can also use dynamically evaluated parameters and access Dependency Container to get specific Service.
In case of Method and Class Level Dbal Parameters we get access to passed parameters inside our expression. They can be accessed via method parameters names.
As we can use method level, we can also use class level Dbal Parameters. In case of Class level parameters, they will be applied to all the method within interface.
To make our SQLs more readable we can also use the expression language directly in SQLs.
Suppose we Pagination class
then we could use it like follows:
To enable expression for given parameter, we need to follow structure :(expression), so to use limit property from Pagination class we will write :(pagination.limit)
Ecotone use Converters in order to convert Events into serializable form. This means we can customize process of serializing and deserializing specific Events, to adjust it to our Application.
So let's assume UserCreated Event:
If we would want to change how the Event is serialized, we would define Converter
Then the Event Stream would look like above
This basically means we can serialize the Event in the any format we want.
Having customized Converters for specific Events, is also useful when we need to adjust some legacy Events to new format. We can hook into the deserialization process, and modify the payload to match new structure.
When using support, we can even customize how we want to serialize given class, that is used within Events.
For example we could have User Created Event which make use of UserName class.
the UserName would be a simple Class which contains of validation so the name is not empty:
Now if we would serialize it without telling JMS, how to handle this class we would end up with following JSON in the Event Stream:
Now this is fine for short-lived applications and testing, however in the long living application this may become a problem. The problem may come from changes, if we would simply change property name in UserName.value to UserName.data it would break deserialization of our previous Events.
As data does not exists under name key.
Therefore we want to keep take over the serialization of objects, to ensure stability along the time.
Now with above Converter, whenever we will use UserName class, we will be actually serializing it to simple string type, and then when deserialize back from simple type to UserName class:
With this, with few lines of code we can ensure consistency across different Events, and keeping our Events bullet proof for code refactor and changes.
In case of storing sensitive data, we may be forced by law to ensure that data should be forgotten (e.g. ). This basically means, if Customer will ask to us to remove his data, we will be obligated by law to ensure that this will happen.
However in case of Event Sourced System we rather do not want to delete events, as this is critical operation which is considered dangerous. Deleting Events could affect running Projections, deleting too much may raise inconsistencies in the System, and in some cases we may actually want to drop only part of the data - not everything. Therefore dropping Events from Event Stream is not suitable solution and we need something different.
Solution that we can use, is to change the way we serialize the Event. We can hook into serialization process just as we did for normal serialization, and then customize the process. Converter in reality is an Service registered in Dependency Container, so we may inject anything we want there in order to modify the serialization process.
So let's assume that we want to encrypt UserCreated Event:
So what we do here, is we hook into serialization/deserialization process and pass the data to EncryptionService. As you can see here, we don't store the payload here, we simply store an reference in form o a key.
EncryptionService can as simple as storing this data in database table using key as Primary Key, so we can fetch it easily. It can also be stored with encryption in some cryptographic service, yet it may also be stored as plain text. It all depends on our Domain.
However what is important is that we've provided the resource id to the EncryptionService
Now this could be used to delete related Event's data. When Customer comes to us and say, he wants his data deleted, we simply delete by resource:
That way this Data won't be available in the System anymore. Now we could just allow Converters fails, if those Events are meant to be deserialized, or we could check if given key exists and then return dummy data instead.
If we allow Converters to fail when Serialization happens, we should ensure that related Projections are using simple arrays instead of classes, and handle those cases during Projecting. If we decide to return dummy data, we can keep deserializing those Events for Projections, as they will be able to use them.
Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.
In order to run Command Handler asynchronously we need to mark it as Asynchronous.
The same way we define for Event Handlers:
We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to our Message Handlers.
The asynchronous attribute states what Channel reference we want to use:
The "orders" string is the name of our Message Channel. We use this name to reference which implementation we want to use—whether it's an in-memory channel for testing, a database queue, or RabbitMQ. This naming approach keeps our business code clean and independent from infrastructure choices.
To configure a specific implementation like a database channel, we use a class.
That's all the configuration we need! Now whenever we reference "orders" in our handler attributes, Ecotone automatically uses this database channel. Our handlers stay exactly the same whether we're using in-memory channels for testing or database channels for production—the only difference is this single configuration change.
We can first list all of the Message Consumers we have available for running:
Then in order to run our Message Consumer, we will use ecotone:run console command:
You may set up running configuration for given consumer while running it.
handledMessageLimit - Amount of messages to be handled before stopping consumer
executionTimeLimit - How long consumer should run before stopping (milliseconds)
finishWhenNoMessages - Consumers will be running as long as there will be messages to consume
Using configuration for statically configuration.
There are multiple different implementation which we can use:
Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.
You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.
All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint
If you want to all polling endpoints you should make use of on this.
Each Asynchronous Message Handler requires us to define "endpointId". It's unique identifier of your Message Handler.
The Endpoint ID travels with your message as part of the headers to your message channel. Once we consume the message from the Message Channel, Ecotone uses this ID to route it to the correct Message Handler. This completely decouples our messages from specific handler classes and methods—we can refactor, rename, or move our handlers around without breaking message routing.
Instant Retries are powerful self-healing mechanism, which helps Application to automatically recover from failures.
The are especially useful to handle temporary issues, like optimistic locking, momentary unavailability of the external service which we want to call or database connection failures. This way we can recover without affecting our end users without any effort on the Developer side.
Instant retries can be enabled for CommandBus and for Asynchronous Processing.
In order to set up instant retries for Command Bus, you configuration.
This will retry your synchronous Command Handlers.
This will retry instantly when your message is handled asynchronously. This applies to Command and Events. Take under consideration that Ecotone , so it's safe to retry them.
By using instant retries for asynchronous endpoints we keep message ordering.
The InstantRetry attribute allows you to specify different strategies for Retry in order to be able to customize it for specific Business use cases. For example we may create new Command Bus which will retry on NetworkException then use that in specific cases with custom retry.
We do it by extending CommandBus interface and adding InstantRetry attribute.
Customized Instant Retries are available as part of Ecotone Enterprise.
To set up Customized Instant Retries, we will extend CommandBus and provide the attribute
CommandBusWithRetry will be automatically registered in our Dependency Container and available for use.
Now whenever we will send an Command using this specific Command Bus, it will do two extra retries:
The same way we can define specific exception list which should be retried for our customized Command Bus:
Only the exceptions defined in exception list will be retried.
Delayed retries are helpful in case, we can't recover instantly. This may happen for example due longer downtime of external service, which we integrate with. In those situations we may try to self-heal the application, by delaying failed Message for given period of time. This way we can retry the call to given service after some time, and if everything is fine, then we will successfully handle the message.
Ecotone resends the Message to original channel with delay. This way we don't block processing during awaiting time, and we can continue consuming next messages. When Message will be ready (after delay time), it will be picked up from the Queue.
First need to be set up for your Application, then you may configure retries.
If you want to use inbuilt Error Retry Strategy and set retry attempts, backoff strategy, initial delay etc, you may configure using ErrorHandlerConfiguration from .
When we have consumer named "asynchronous_messages", then we can define PollingMetadata with customer error Channel.
It's good to know how Ecotone solves the problem of Message Handling Isolation, which is one of the key features that allows us to build Resilient Messaging Systems.
In Message-Based Systems we will have situation that as a result of given Event, we will want to trigger some actions. This can sending notification, but also calling an external Service or starting fully new separate flow etc.
However those actions may actually fail for various of reasons and depending on how Messaging is implemented, it may help us to recover from this safely, or trigger unexpected side effects that may harm the business.
Let's first consider typical implementation of Message Bus, where we send an Event Message which is consumed by more than one Message Handler (Subscriber / Event Handler).
After placing an Order we send Asynchronous Order Was Placed Event Message, which as as a result triggers all related Event Handlers. As we can easily imagine, one of those Event Handlers may fail. However this creates a problem, because it's not only the Event Handler that have failed will be retried, but all the Event Handlers connected to given Event Message.
This of course may produce unexpected side effects, like sending confirmation twice, or delivering goods to the Customers more than once. Idempotency may help here, but it's not always available or implemented correctly, therefore we may try to solve it on higher level code.
To solve it using higher level code we may introduce multiple Messages Queues having single Message Handler connected, or produce Command Messages from Event Handlers in order to provide isolation. However all of those solutions make infrastructure, configuration or application level code more complex. This is because we try to solve Message Handling Isolation in upper levels, instead of having it solved on the foundation level.
Ecotone solves Message Handling Isolation at the foundation level, by delivering a copy of a Message to each of the related Event Handler separately:
Whenever Event Message is sent, a copy of this Message will be delivered to each of the related Event Handlers. This as a result make each Handler consume the Message in complete isolation and enables safe retries.
Handling each Event Handler in complete isolation, creates environment where safe retries are possible, as only Event Handler that have failed will be retried. By solving this on the foundation, the higher level code can stay focused on business part of the system, not solving Message Handling Isolation problems.
There are of course more benefits that this solution enables:
Possibility to safely retry
instead of whole Message
instead of whole Message
Ecotone's implementation enables safe retries, thanks to the processing isolation it provides.
Let's consider asynchronous scenario, where we want send order confirmation and reserve products in Stock via HTTP call, when Order Was Placed. This could potentially look like this:
Now imagine that sending to Stock fails and we want to retry. If we would retry whole Event, we would retry "notifyAboutNewOrder" method, this would lead to sending an notification twice. It's easy to imagine scenarios where this could lead to even worse situations, where side effect could lead to double booking, trigger an second payment etc. In Ecotone this does not happen, as each of the Handlers would receive it's own copy of the Message and proceed in isolation.
In Ecotone each of the Handlers will receive it's own copy of the Event and will handle it in full isolation.
This means that under the hood, there would be two messages sent to asynchronous_messages
each targeting specific Event Handler.
This bring safety to retrying events, as in case of failure, we will only retry the Handler that actually failed.
In Ecotone it's the Handler that becomes Asynchronous (not Event itself) you may customize the behaviour to your needs. If you want, you may:
Run one Event Handler synchronously and the other asynchronously.
You may decide to use different Message Channels for each of the Asynchronous Event Handlers.
[Article]
In we discussed that Event Sourcing Aggregates are built from Event Streams stored in the data store. Yet it's important to understand how those Events gets to the Event Stream in the first place.
Let's start by manually appending Events using Event Store. This will help us understand better the concepts behind the Event Stream and Event Partitioning. After we will understand this part, we will introduce Event Sourcing Aggregates, which will abstract away most of the logic that we will need to do in this chapter.
PHP Event Sourcing Projections
Before diving into this topic, read the first.
The power of Event Sourcing is not only the full history of what happened. As we do have a full history, it's easy to imagine that we may want to use it for different purposes. And one of our purposes will be related to view this data in specific way. \
Let's take as an example our Ticket's Event Stream:
In most of the situations besides knowing the history, we would also want to know how all does Tickets looks at present moment. Therefore it means we need to build an view from those events which will represent current state, and for this we use
Ecotone comes with which allows for easily switching from like Amazon SQS, RabbitiMQ, Redis, Kafka and more. This abstraction is used for Service (application) level asynchronous communication like Asynchronous Message Handlers. However this abstraction can also be combined with Distributed Bus mechanism to enable cross Service Communication using Service Map.
This functionality is available as part of Ecotone Enterprise.
#[DbalWrite('INSERT INTO activities VALUES (:personId, :time)')]
public function add(string $personId, \DateTimeImmutable $time): void;final readonly class UserCreated
{
public function __construct(
public string $userId,
public string $name,
public string $surname,
)
{
}
}final readonly class UserCreatedConverter
{
#[Converter]
public function toArray(UserCreated $event): array
{
return [
'userId' => $event->userId,
'userName' => $event->name,
'userSurname' => $event->surname,
];
}
#[Converter]
public function fromArray(array $event): UserCreated
{
return new UserCreated(
$event['userId'],
$event['userName'],
$event['userSurname'],
);
}
}#[Asynchronous("orders")]
#[CommandHandler(endpointId: "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
// do something with $event
}

memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)
stopOnFailure - Stop consumer in case of exception
You delay or add priority to one Handler and to the other not




#[DbalWrite('INSERT INTO activities VALUES (:personId, :time)')]
public function store(PersonId $personId, \DateTimeImmutable $time): void;final readonly class PersonId
{
public function __construct(private string $id) {}
public function __toString(): string
{
return $this->id;
}
} /**
* @param string[] $roles
*/
#[DbalWrite('UPDATE persons SET roles = :roles WHERE person_id = :personId')]
public function changeRoles(
int $personId,
#[DbalParameter(convertToMediaType: MediaType::APPLICATION_JSON)] array $roles
): void;final class PersonRoleConverter
{
#[Converter]
public function from(PersonRole $personRole): string
{
return $personRole->getRole();
}
#[Converter]
public function to(string $role): PersonRole
{
return new PersonRole($role);
}
} /**
* @param PersonRole[] $roles
*/
#[DbalWrite('UPDATE persons SET roles = :roles WHERE person_id = :personId')]
public function changeRolesWithValueObjects(
int $personId,
#[DbalParameter(convertToMediaType: MediaType::APPLICATION_JSON)] array $roles
): void;#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function register(
int $personId,
#[DbalParameter(expression: 'payload.toLowerCase()')] PersonName $name
): void;#[DbalWrite('INSERT INTO persons VALUES (:personId, :name)')]
public function insertWithServiceExpression(
int $personId,
#[DbalParameter(expression: "reference('converter').normalize(payload)")] PersonName $name
): void;#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :roles)')]
#[DbalParameter(name: 'roles', expression: "['ROLE_ADMIN']", convertToMediaType: MediaType::APPLICATION_JSON)]
public function registerAdmin(int $personId, string $name): void;#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :registeredAt)')]
#[DbalParameter(name: 'registeredAt', expression: "reference('clock').now()")]
public function registerAdmin(int $personId, string $name): void;#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :roles)')]
#[DbalParameter(name: 'roles', expression: "name === 'Admin' ? ['ROLE_ADMIN'] : []", convertToMediaType: MediaType::APPLICATION_JSON)]
public function registerUsingMethodParameters(int $personId, string $name): void;#[DbalParameter(name: 'registeredAt', expression: "reference('clock').now()")]
class AdminAPI
{
#[DbalWrite('INSERT INTO persons VALUES (:personId, :name, :registeredAt)')]
public function registerAdmin(int $personId, string $name): void;
}final readonly class Pagination
{
public function __construct(public int $limit, public int $offset)
{
}
}interface PersonService
{
#[DbalQuery('
SELECT person_id, name FROM persons
LIMIT :(pagination.limit) OFFSET :(pagination.offset)'
)]
public function getNameListWithIgnoredParameters(
Pagination $pagination
): array;
}final readonly class UserCreated
{
public function __construct(
public string $userId,
public UserName $name,
public string $surname,
)
{
}
}final readonly class UserName
{
public function __construct(
public string $value,
)
{
if ($value === "") {
throw new \InvalidArgumentException("Name should not be empty");
}
}
}{
"userId": "123",
"name": {"value": "Johny"},
"surname": "Bravo"
}class UserNameConverter
{
#[Converter]
public function from(UserName $data): string
{
return $data->value;
}
#[Converter]
public function to(string $data): UserName
{
return new UserName($data);
}
}{
"userId": "123",
"name": "Johny",
"surname": "Bravo"
}final readonly class UserCreatedConverter
{
public function __construct(
private EncryptingService $encryptingService
){}
#[Converter]
public function toArray(UserCreated $event): array
{
$key = Uuid::v4()->toString();
return
[
'key' => $key,
'data' => $this->encryptingService->encrypt(
key: $key,
resource: $event->userId,
data: [
'userId' => $event->userId,
'userName' => $event->name,
'userSurname' => $event->surname,
]
)
];
}
#[Converter]
public function fromArray(array $event): UserCreated
{
$data = $this->encryptingService->decrypt($event['key']);
return new UserCreated(
$event['userId'],
$event['userName'],
$event['userSurname'],
);
}
}$this->encryptingService->encrypt(
key: $key,
// our resource id, to group related records
resource: $event->userId,
data: [
'userId' => $event->userId,
'userName' => $event->name,
'userSurname' => $event->surname,
]
)$this->encryptingService->delete(resource: $userId);#[Asynchronous("orders")]final readonly class EcotoneConfiguration
{
#[ServiceContext]
public function databaseChannel()
{
return DbalBackedMessageChannelBuilder::create('orders');
}
}bin/console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+artisan ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+$consumers = $messagingSystem->list()bin/console ecotone:run orders -vvvartisan ecotone:run orders -vvv$messagingSystem->run("orders");bin/console ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--finishWhenNoMessages
--memoryLimit=512
--stopOnFailureartisan ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--finishWhenNoMessages
--memoryLimit=512
--stopOnFailure$messagingSystem->run(
"orders",
ExecutionPollingMetadata::createWithDefault()
->withHandledMessageLimit(5)
->withMemoryLimitInMegabytes(100)
->withExecutionTimeLimitInMilliseconds(1000)
->withFinishWhenNoMessages(true)
->withStopOnError(true)
);class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
->setInitialDelayInMilliseconds(100)
->setMemoryLimitInMegaBytes(100)
->setHandledMessageLimit(10)
->setExecutionTimeLimitInMilliseconds(100)
->withFinishWhenNoMessages(true)
];
}
}#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed") // Your important endpoint Id
public function when(OrderWasPlaced $event) : void {}#[ServiceContext]
public function registerRetries()
{
return InstantRetryConfiguration::createWithDefaults()
->withCommandBusRetry(
isEnabled: true,
retryTimes: 3, // max retries
retryExceptions: [DatabaseConnectionFailure::class, OptimisticLockingException::class] // list of exceptions to be retried, leave empty if all should be retried
)
}#[ServiceContext]
public function registerRetries()
{
return InstantRetryConfiguration::createWithDefaults()
->withAsynchronousEndpointsRetry(
isEnabled: true,
retryTimes: 3, // max retries
retryExceptions: [DatabaseConnectionFailure::class, OptimisticLockingException::class] // list of exceptions to be retried, leave empty if all should be retried
)
}#[InstantRetry(retryTimes: 2)]
interface ReliableCommandBus extends CommandBus {}$this->commandBusWithRetry->send(new RegisterNewUser());#[InstantRetry(retryTimes: 2, exceptions: [NetworkException::class])]
interface ReliableCommandBus extends CommandBus {}#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "notifyAboutNewOrder")]
public function notifyAboutNewOrder(OrderWasPlaced $event, NotificationService $notificationService) : void
{
$notificationService->notifyAboutNewOrder($event->getOrderId());
}#[ServiceContext]
public function errorConfiguration()
{
return PollingMetadata::create("asynchronous_messages")
->setErrorChannel("customErrorChannel");
}
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"customErrorChannel",
RetryTemplateBuilder::exponentialBackoff(100, 3)
->maxRetryAttempts(2)
);
}#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "notifyAboutNewOrder")]
public function notifyAboutNewOrder(OrderWasPlaced $event, NotificationService $notificationService) : void
{
$notificationService->notifyAboutNewOrder($event->getOrderId());
}
#[Asynchronous("asynchronous_messages")]
#[EventHandler(endpointId: "reserveItemsInStock")]
public function reserveItemsInStock(OrderWasPlaced $event, StockClient $stockClient): void
{
$stockClient->reserve($event->getOrderId(), $event->getProducts());
}After installing Ecotone's Event Sourcing we automatically get access to Event Store abstraction. This abstraction provides an easy to work with Event Streams.
Let's suppose that we do have Ticketing System like Jira with two basic Events "Ticket Was Registered" and "Ticket Was Closed". Of course we need to identify to which Ticket given event is related, therefore will have some Id.
In our code we can define classes for those:
To store those in the Event Stream, let's first declare it using - Event Store abstraction.
Event Store provides few handy methods:
As we want to append some Events, let's first create an new Event Stream
This is basically enough to create new Event Stream. But it's good to understand what actually happens under the hood.
In short Event Stream is just audit of series of Events. From the technical point it's a table in the Database. Therefore when we create an Event Stream we are actually creating new table.
Event Stream table contains:
Event Id - which is unique identifier for Event
Event Name - Is the named of stored Event, which is to know to which Class it should be deserialized to
Payload - is actual Event Class, which is serialized and stored in the database as JSON
Metadata - Is additional information stored alongside the Event
To append Events to the Event Stream we will use "appendTo" method
This will store given Event in Ticket's Event Stream
Above we've stored Events for Ticket with id "123". However we can store Events from different Tickets in the same Event Stream.
We now can load those Events from the Event Stream
This will return iterator of Ecotone's Events
As we can see this maps to what we've been storing in the Event Stream table.
Payload will contains our deserialized form of our event, so for example TicketWasRegistered.
We could also fetch list of Events without deserializing them. $events = $eventStore->load("ticket", deserialize: false);
In that situations payload will contains an associative array. This may be useful when iterating over huge Event Streams, when there is no need to actually work with Objects. Besides that ability to load in batches may also be handy.
Let's consider what may actually happen during concurrent access to our System. This may be due more people working on same Ticket or simply because our system did allow for double clicking of the same action.
In those situations we may end up storing the same Event twice
Without any protection we will end up with Closing Events in the Event Stream. That's not really ideal, as we will end up with Event Stream having incorrect history:
This is the place where we need to get back to persistence strategy:
We've created this Stream with "simple" persistence strategy. This means we can apply any new Events without guards. This is fine in scenarios where we are dealing with no business logic involved like collecting metrics, statistics. where all we to do is to push push Events into the Event Stream, and duplicates are not really a problem. However simple strategy (which is often the only strategy in different Event Sourcing Frameworks), comes with cost:
We lose linear history of our Event Stream, as we allow for storing duplicates. This may lead to situations which may lead to incorrect state of the System, like Repayments being recorded twice.
As a result of duplicated Events (Which hold different Message Id) we will trigger side effects twice. Therefore our Event Handlers will need to handle this situation to avoid for example trigger requests to external system twice, or building wrong Read Model using Projections.
As we do allow for concurrent access, we can actually make wrong business decisions. For example we could give to the Customer promotion code twice.
The "simple strategy" is often the only strategy that different Event Sourcing Frameworks provide. However after the solution is released to the production, we often start to recognize above problems, yet now as we don't have other way of dealing with those, we are on mercy of fixing the causes, not the root of the problem. Therefore we need more sophisticated solution to this problem, to solve the cause of it not the side effects. And to solve the cause we will be using different persistence strategy called "partition strategy".
Event Stream can be split in partitions. Partition is just an sub-stream of Events related to given Identifier, in our context related to Ticket.
Partition is linear history for given identifier, where each Event is within partition is assigned with version. This way we now, which event is at which position. Therefore in order to partition the Stream, we need to know the partition key (in our case Ticket Id). By knowing the partition key and last version of given partition, we can apply an Event at the correct position. To create partitioned stream, we would create Event Stream with different strategy:
This will create Event Stream table with constraints, which will require:
Aggregate Id - This will be our partition key
Aggregate Type - This may be used if we would store more Aggregate types within same Stream (e.g. User), as additional partition key
Aggregate Version - This will ensure that we won't apply two Events at the same time to given partition
We append those as part of Event's metadata:
Let's now see, how does it help us ensuring that our history is always correct. Let's assume that currently we do have single Event in the partition
Now let's assume two requests happening at the same time:
This way allows us to be sure that within request we are dealing with latest Event Stream, because if that's not true we will end up in concurent exception. This kind of protection is crucial when dealing with business logic that depends on the previous events, as it ensures that there is no way to bypass it.
So we may be in need to build the list of all the tickets and their current status
For example this list can be stored in the database table with three columns:
id
type
status
The difference between the traditional approach and ES approach is that we will be delivering this view from the Event Stream. Therefore this will be only the representation build up from the past events, and will be used only for reading. Data delivered from Events to shape specific view, we call Read Models.
Ecotone provides abstraction to quickly build new Projections, it does follow Ecotone's declarative configuration. Before we will jump into implementation, let's quickly review how our Ticket Event Sourced Aggregate could look like:
So we do have two Events here, TicketWasRegistered and TicketWasClosed. We will be subscribing to those in order to build our new Ticket List Projection.\
Let's first define our new Ticket List Projection
We do start by creating new class, which we mark with Projection attribute. The first argument is the name of our new projection "ticket_list", the second is the related ES Aggregate "Ticket" from which we will be subscribing Events. We will touch on the second argument more in next sections.
Ecotone will take care of creating the Projection for us, therefore we can tell it how to do it
Now we are ready to subscribe to Ticket related Events
This is enough for Ecotone to know that this should be triggered whenever TicketWasRegistered happens. As a result of triggering this Event Handler we store new ticket in the our database table
We also want to change the status, when ticket is closed, so let's add that now:
This is all to make our Projection work. There is no any additional configuration needed as we are working from higher level abstraction. We tell what events we want to have delivered, and Ecotone will take care of delivering and initializing and triggering our Projections.
By default this Projection will be triggered synchronously. This means that after Event Sourced Aggregate is called, Events will first be stored in the Event Stream, and then Projection will be called.
Our Projection subscribe to those Events, therefore it will be triggered as a result
By default projections work synchronously as part of the same process of Command Handler execution. This ensures that our Projection is always consistent with changes in the Event Stream, because it's wrapped in database transaction.
Synchronous projects are done within same transaction as the Command execution. This way Ecotone ensures consistency of the data by default. This behaviour is configurable.
This works well for scenarios when there are no much changes to happening to given instance of Event Sourced Aggregate. How Ecotone handles Concurrency was described in more details in previous section. What is important is that for low writes this solution will work perfectly, for high volume of writes on other hand we may want to trigger Projections asynchronously.
Synchronous projections are simpler in development, as we can immediately fetch the data from Read Model and be sure that is consistent with the changes. With Asynchronous data may be refreshed after some time, therefore if fetched immediately, we may get stale results.
Ecotone provides great abstraction for making the code asynchronous. From development perspective the code stays the same like synchronous, yet under the hood thanks to Messaging abstraction it can be easily switched to work asynchronously via Message Channels.
So to make the Projection asynchronous, the only thing which we need to do, is to mark it as asynchronous.
Ecotone will take care of delivering the triggering Event via given async channel to the Projection. This way we can start with synchronous projections, and when we will feel the need, simply switch them to Asynchronous without any single line of code being changed. You may read more about execution process in next section. We need to touch on one more important topic. Where do we actually get the data from for Projections.
Events that trigger Projections are not actually a source of the data for them. This is because if we would lose Event Message along the way due some failure (For example we don't use Outbox) or it would and in Dead Letter then we would basically skip over an Event.
Let's take as an example Asynchronous Projection, where we want to store Ticket with new "alert-warning" type. However let's suppose we've created column with limited size for type - which is up to 10 characters. Therefore our Projection will fail on storing that, because the type is 13 characters long:
Now if after that we will receive Ticket was closed event, then related Event Handler would update nothing, as there is no this Ticket stored in our Read Model:
So this is obviously not way of ensuring consistency in the system. Ecotone does it differently and treats the incoming Events just as "triggers". It works like information for the Projection to fetch the Events from the Event Stream and start Projecting.\
This way if even so Ticket Was Registered failed, when Ticket was Closed would come after it would still get the original event first. So if we would fix the problem with column size, it would basically self-heal automatically.
Each Projection keep track of it's position. Therefore whenever new Event comes in, it knows from which point in Event Stream it should fetch the Events from.
There is one more important reason for building Projections from Event Stream, Projection Rebuilding.
Thanks to Projections ability to build the projection from the Event Stream, we are not bound by time. When we deploy new Projection, it will go over previous Events as part of the projecting process. This way we can ability to be able to ship new Projections at any point of time, yet with ability to use all the previous Events from the past.
Besides that we can rebuild existing Projection, as rebuilding is all about reseting the Projection's position, and start to fetch from scretch. You may read about available actions in Executing and Managing section.

It happens that communication between Services (Applications) is built using different Message Broker features to set up the topology. Which may require per feature configuration and provisioning, and in-depth knowledge about the Message Broker. This often end up as really complex solution, which becomes hard to understand and follow for Developers. When things becomes hard to understand, they become hard to change, as it raises the risk that potential modification may break something. As a result people try to avoid doing changes and development slows down. Therefore there is a need for different approach which keeps the things simple, easy to understand and change. Changes to the integration should not be scary, they should be straight forward and testable, so Developers can feel confidence in doing so. The best solution does not only make things simple to change, but also make things explicit, so just by looking people get more knowledge about the overal system design. And for this Ecotone comes with approach for Service to Service integration based on Service Map.
Service Map is a map of integrated Services (Applications), and points to specific Message Channels to which Messages for given Service should be sent:
In this approach Message Channels (Pipes) are simple transport layer, and the routing is done on the Application (Endpoint) level using Service Map to make the decision.
Making Service available for integration is matter of adding it to the Service Map:
and defining implementation of the Message Channel:
We may choose any Message Channel Provider we want, or even different providers depending on the integrated Service. This opens possibilities for using right tool for right job, as for example in one integration we could use Redis and for other one RabbitMQ or Kafka.
Having the routing map on the Application level instead of Message Broker level means we avoid vendor-lock. In case of need to switch to different Message Broker Provider, we can simply change Message Channel implementation and our integration will continue to work.
Approach of treating Message Brokers as simple transport layer and doing the routing on the Application level to send to the right Message Channel follows smart endpoint dump pipes approach. This as a result make System easy to reason about and understand. Every developer can simply take look on the Map to understand where the Message will land. Adding new integration does not require specific Message Broker knowledge, as it all comes up to adding an Service to the Map and defining the Message Channel provider. Therefore Developers can add easily maintain and change such integration.
As we understand the concepts behind Distributed Bus with Service Map now, let's dive into practical example.
Let’s suppose User Service wants to create Ticket by sending Command to Ticket Service. In Ticket Service we will explicitly state that we allow given Command Handler to be executed in Distributed way. This makes it clear for everyone that we can't simply delete this Command Handler, as other Services may rely on this integration:
on the side of User Service we would call Distributed Bus for Command to do so:
Our topology will look like this:
As we can see above, we do have our two Services "UserService" and "TicketService". Ticket Service will be consuming incoming messages from "distributed_ticket_service" Messsage Channel. Therefore when we send Messages to ticketService we need to send it to that Message Channel, this will be done automatically by DistributedBus using Service Map.
In User Service let's then define Service Map using ServiceContext configuration:
Now when we will send Command to ticketService it will land in distributed_ticket_service channel.
Two level routing is a way to find target Service and within in Command Handler we want to execute:
"targetServiceName" will be used to target specific Service therefore it will make use Message Channel defined in the Service Map. When Message will land in given Service, it will then use "routingKey" to target specific Command Handler within the Application.
As you can see above, under the hood in target Service DistributedCommandHandler will be executed. This Handler is entrypoint to our Service, and will be responsible for triggering Command/Event Bus with given routing key.
Event distribution is a bit different from Command distribution. In case of Command we do have single Service that will receive the Message, in case of Events however there may be multiple of them. Let's expand our previous example to include Order Service, and our scenario is that whenever new User is registered in User Service, we will publish this event to both Ticket and Order Services.
On the consumption part we will be marking our Event Handlers with Distributed:
On the publishing side, we will be using publish event method with Distributed Bus:
Like you can see there is no targetServiceName in the parameters anymore (comparing to distributing Command), this is because Event may land in more than one Service. However we keep routingKey as this the name to which consuming Services subscribe (look EventHandler attribute parameter).
By default Event will be published to all Services in the Service Map, with exception of originating Service that publish this Event, this one will be skipped (to avoid publishing to itself). Therefore the default behaviour broadcast the Event to all Services defined in Service Map.
It's a good practice to share the Service Map between Services. In order to have one single source of truth for your Service (Context) Mapping. This can also serve as reference for Developers to understand bigger picture of the System.
In case given Service is not interested in specific Event, it will simply ignore it. Therefore default publishing can really speed up of development process, and make things clear and simple. However with larger volume of published Events, there may be a lot of ignored Events flying in the system, therefore in that situation we may consider using filtered publishing.
Filtered publishing allows for optimalization in publishing. This way we can publish Events only to the Services that are actually interested in those.
To configure map with subscription keys, we will be using subscriptionRoutingKeys parameter in Service Map configuration:
Subscription routing keys is array, therefore we may put multiple subscribition routing keys if needed. Subscrption keys can point to exact event name: "userService.address.changed" Or they may use wild card: "userService.account.*"
By default subscriptionRoutingKeys are null, which means given Service will receive all Events. If we will provide empty array, it means that subscription keys are enabled, yet none are matching, therefore no Events will be send to given Service.
When Service Map is defined as separate shared library. It becomes explicit what Events is given Service interested in. This also makes the process of subscribing to new Event visible for everyone, therefore we avoid hidden coupling that could lead to broken integration.
DDD PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
New Aggregates are initialized using public factory method (static method).
After calling createTicket aggregate will be automatically stored.
Factory method is static method in the Aggregate class. You may have multiple factory methods if needed.
Sending Command looks exactly the same like in scenario.
When factory method is called from Command Bus, then Ecotone will return new assigned identifier.
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading and saving the aggregate after calling action method.
ChangeTicket should contain the identifier of Aggregate instance on which action method should be called.
And then we call it from Command Bus:
In fact we don't need to provide identifier in our Commands in order to execute specific Aggregate instance. We may not need a Command class in specific scenarios at all.
In this scenario, if we would add Command Class, it would only contain of the identifier and that would be unnecessary boilerplate code. To solve this we may use in order to provide information about instance of the aggregate we want to call.
"aggregate.id" is special metadata that provides information which aggregate we want to call.
When we avoid creating Command Classes with identifiers only, we decrease amount of boilerplate code that we need to maintain.
There may be a cases where you would like to do conditional logic, if aggregate exists do thing, otherwise this. This may be useful to keep our higher level code clean of "if" statements and to simply API by exposing single method.
Both Command Handlers are registered for same command CreateTicket, yet one method is factory method and the second is action method.
When Command will be sent, Ecotone will try to load the aggregate first,
if it will be found then changeTicket method will be called, otherwise createTicket.
Redirected aggregate creation works the same for Event Sourced Aggregates.
For standard Aggregates (non Event-Sourced) we can use WithEvents trait or provide method that with AggregateEvents attribute to provide list of Events that Aggregate has recorded. After saving changes to the Aggregate, Ecotone will automatically publish related Events
Just as standard Command Handler, we can pass Metadata and DI Services to our Aggregates.
DDD PHP
An Aggregate is an entity or group of entities that is always kept in a consistent state. Aggregates are very explicitly present in the Command Model, as that is where change is initiated and business behaviour is placed.
Let's create our first Aggregate Product.
Aggregate attribute marks class to be known as Aggregate
Identifier marks properties as identifiers of specific Aggregate instance. Each Aggregate must contains at least one identifier.
CommandHandler enables command handling on specific method just as we did in . If method is static, it's treated as a and must return a new aggregate instance. Rule applies as long as we use instead of .
Now remove App\Domain\Product\ProductService as it contains handlers for the same command and query classes.
Before we will run our test scenario, we need to register Repository.
Repositories are used for retrieving and saving the aggregate to persistent storage. We will build an in-memory implementation for now.
Repository attribute marks class to be known to Ecotone as Repository.
We need to implement some methods in order to allow Ecotone to retrieve and save Aggregate. Based on implemented interface, Ecotone knowns, if Aggregate is state-stored or event sourced.
canHandle
Let's run our testing command:
Have you noticed what we are missing here? Our Event Handler was not called, as we do not publish the ProductWasRegistered event anymore.
In order to automatically publish events recorded within Aggregate, we need to add method annotated with AggregateEvents. This will tell Ecotone where to get the events from.
Ecotone comes with default implementation, that can be used as trait WithEvents.
Let's run our testing command:
Congratulations, we have just finished Lesson 2. In this lesson we have learnt how to make use of Aggregates and Repositories. Now we will learn about Converters and Metadata
Error Channel
Ecotone comes with solution called Error Channel.
Error Channel is a place where unrecoverable Errors can go, this way we can preserve Error Messages even if we can't handle them anyhow at given moment.
Error Channel may log those Messages, store them in database, push them to some Asynchronous Channel, it all depends on what Handler we will connect to the Error Channel.
On the high level Error Channel works as follows:
Message Consumer is polling Messages from the Queue and executing related Message Handlers.
When execution of given Handler fails, Error is propagated back to Message Consumer
Message Consumer based on the configuration sends it to related Error Channel
Error Channel can be configured per Message Consumer, or globally as default Error Channel for all Message Consumers:
-
-
-
config/packages/ecotone.yaml
config/ecotone.php
To handle incoming Error Messages, we can bind to our defined Error Channel using :
Ecotone provides inbuilt retry mechanism, in case of failure Error Message will be resent to its original Message Channel with a delay. This way we will give application a chance to self-heal and return to good state.
To configure Delayed Retries we need to set up Error Configuration and connect it to our Error Channel:
If for some cases we want to discard Error Messages, we can set up error channel to default inbuilt one called "nullChannel". That may be used in combination of retries, if after given attempt Message is still not handled, then discard:
Ecotone comes with full support for managing full life cycle of a error message. This allows us to store Message in database for later review. Then we can review the Message, replay it or delete.
Read more in next .
Dead Letter can be combined with Delayed Retries, to store only Error Messages that can't self-heal. Read more in related section.
Ecotone provides ability to define Error Channel on the level of Command Bus. This way we can handle synchronous Command Handler execution failures with grace. For example we may receive webhooks and in case of failure instead of throwing exception, we can store related Message in Dead Letter for later review, or push it for reprocessing on asynchronous channel. This way we can ensure stability of our system, even for synchronous scenarios like HTTP integrations.
Command Bus Error Channel is available as part of Ecotone Enterprise.
To set up Error Channel for Command Bus, we will extend Command Bus with our Interface and add ErrorChannel attribute.
Now instead of using CommandBus, we will be using ResilientCommandBus for sending Commands. Whenever failure will happen, instead being propagated, it will now will be redirected to our Dead Letter and stored in database for later review.
We can extend our Command Bus with Error Channel by providing instant retries. This way we can do automatic retries before we will consider Message as failed and move it to the Error Channel. This way we give ourselves a chance of self-healing automatically in case of transistent errors, like database or network exceptions.
Now instead of using CommandBus, we will be using ResilientCommandBus for sending Commands. Whenever failure will happen, instead being propagated, it will now will be redirected to our Dead Letter and stored in database for later review.
Instead of pushing Message to Error Channel, we can push it to Asynchronous Message Channel from which Message will be consumed and retried again. This way in case of failure we can make it possible for Message to be retried and end up self-healing.
and then for use RabbitMQ Message Channel:
It's good practice to use different Message Channel implementation than the storage used during process the Message. For example if our processing requires database connection and our database went down, then if our configured channel is RabbitMQ channel, then we will be able to push those Messages into the Queue instead of failing.
We can combine Asynchronous Error Channel together with delayed retries, creating robust solution, that our Application is able to self-heal from transistent errors even if they take some period of time. For example if our calling some external Service fails, or database went down, then we may receive the same error when Message is retrieved by Async Channel. However if we will delay that by 20 seconds, then there is huge chance that everything will get back on track, and the Application will self-heal automatically.
Command Bus configuration:
And delayed retry configuration:
Of course we could add Dead Letter channel for our delayed retries configuration. Closing the full flow, that even if in case delayed retries failed, we will end up with Message in Dead Letter.
final readonly class TicketWasRegistered
{
public function __construct(
public string $id,
public string $type
) {}
}
final readonly class TicketWasClosed
{
public function __construct(
public string $id,
) {}
}interface EventStore
{
/**
* Creates new Stream with Metadata and appends events to it
*
* @param Event[]|object[] $streamEvents
*/
public function create(string $streamName, array $streamEvents = [], array $streamMetadata = []): void;
/**
* Appends events to existing Stream, or creates one and then appends events if it does not exists
*
* @param Event[]|object[] $streamEvents
*/
public function appendTo(string $streamName, array $streamEvents): void;
/**
* @return Event[]
*/
public function load(
string $streamName,
int $fromNumber = 1,
int $count = null,
MetadataMatcher $metadataMatcher = null,
bool $deserialize = true
): iterable;
}$eventStore->create("ticket", streamMetadata: [
"_persistence" => 'simple', // we will get back to that in later part of the section
]);$eventStore->appendTo(
"ticket",
[
new TicketWasRegistered('123', 'critical'),
new TicketWasClosed('123')
]
);$eventStore->appendTo(
"ticket",
[
new TicketWasRegistered('124', 'critical'),
]
);$events = $eventStore->load("ticket");class Event
{
private function __construct(
private string $eventName,
private object|array $payload,
private array $metadata
)
(...)// concurrent request 1
$eventStore->appendTo(
"ticket",
[
new TicketWasClosed('123'),
]
);
// concurrent request 2
$eventStore->appendTo(
"ticket",
[
new TicketWasClosed('123'),
]
);$eventStore->create("ticket", streamMetadata: [
"_persistence" => 'simple'
]);$eventStore->create("ticket", streamMetadata: [
"_persistence" => 'partition',
]);$eventStore->appendTo(
$streamName,
[
Event::create(
new TicketWasRegistered('123', 'Johnny', 'alert'),
metadata: [
'_aggregate_id' => 1,
'_aggregate_version' => 1,
'_aggregate_type' => 'ticket',
]
)
]
);#[EventSourcingAggregate]
class Ticket
{
use WithAggregateVersioning;
#[Identifier]
private string $ticketId;
public static function register(RegisterTicket $command): array
{
return [new TicketWasRegistered($command->id, $command->type)];
}
#[CommandHandler]
public function close(CloseTicket $command) : array
{
return [new TicketWasClosed($this->ticketId)];
}
}#[Projection("ticket_list", Ticket::class)]
class TicketListProjection {
// This is Connection to our Database or wherever we want to store the data
public function __construct(private Connection $connection) {}
(...)#[ProjectionInitialization]
public function initializeProjection() : void
{
if ($this->connection->createSchemaManager()->tablesExist('ticket_list')) {
return;
}
$table = new Table('ticket_list');
$table->addColumn('id', Types::STRING);
$table->addColumn('type', Types::STRING);
$table->addColumn('status', Types::STRING);
$this->connection->createSchemaManager()->createTable($table);
}#[EventHandler]
public function onTicketWasPrepared(TicketWasRegistered $event) : void
{
$this->connection->insert('ticket_list', [
"id" => $event->id,
"ticket_type" => $event->type,
"status" => "open"
]);
}#[EventHandler]
public function onTicketWasCancelled(TicketWasCancelled $event) : void
{
$this->connection->update(
'ticket_list,
["status" => "cancelled"],
["ticket_id" => $event->getTicketId()]
);
}#[Asynchronous('async')]
#[Projection("ticket_list", Ticket::class)]
class TicketListProjection#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return DistributedServiceMap::initialize()
->withServiceMapping(
serviceName: "ticketService",
channelName: "distributed_ticket_service"
)
}#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return SqsBackedMessageChannelBuilder::create("distributed_ticket_service")
}#[Distributed]
#[CommandHandler("ticketService.createTicket")]
public function changeBillingDetails(CreateTicket $command) : void
{
// create new Ticket
}public function whenPersonRegistered($personId, DistributedBus $distributedBus)
{
$distributedBus->convertAndSendCommand(
targetServiceName: "ticketService",
routingKey: "ticketService.createTicket",
command: new CreateTicket($personId, "Call Customer to collect more details")
);
}#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return DistributedServiceMap::initialize()
->withServiceMapping(
serviceName: "ticketService",
channelName: "distributed_ticket_service"
)
}$distributedBus->convertAndSendCommand(
targetServiceName: "ticketService",
routingKey: "ticketService.createTicket",
command: new CreateTicket($personId, "Call Customer to collect more details")
);#[Distributed]
#[EventHandler("user.was_registered")]
public function when(UserRegistered $event) : void
{
// do something
}$distributedBus->convertAndPublishEvent(
routingKey: "user.was_registered",
event: new UserRegistered($userId)
);#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
return DistributedServiceMap::initialize()
->withServiceMapping(
serviceName: "ticketService",
channelName: "distributed_ticket_service",
subscriptionRoutingKeys: ["userService.account.*"]
)
->withServiceMapping(
serviceName: "orderService",
channelName: "distributed_order_service",
subscriptionRoutingKeys: ["userService.address.changed"]
)
}#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public static function createTicket(CreateTicket $command): static
{
$ticket = new self();
$ticket->id = Uuid::generate();
$ticket->assignedTo = $command->assignedTo;
$ticket->description = $command->description;
return $ticket;
}
}






















QueryHandler enables query handling on specific method just as we did in Lesson 1.
findBy return found aggregate instance or null. As there may be more, than single indentifier per aggregate, identifiers are array.
save saves an aggregate instance. You do not need to bother right what is $metadata and $expectedVersion.






$ticketId = $this->commandBus->send(
new CreateTicket($assignedTo, $description)
);#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public function changeTicket(ChangeTicket $command): void
{
$this->description = $command->description;
$this->assignedTo = $command->assignedTo;
}
}class readonly ChangeTicket
{
public function __construct(
public Uuid $ticketId;
public string $description;
public string $assignedTo
) {}
}$ticketId = $this->commandBus->send(
new ChangeTicket($ticketId, $description, $assignedTo)
);#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private bool $isClosed = false;
#[CommandHandler("ticket.close")]
public function close(): void
{
$this->isClosed = true;
}
}$this->commandBus->sendWithRouting(
"ticket.close",
metadata: ["aggregate.id" => $ticketId]
)#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
private string $description;
private string $assignedTo;
#[CommandHandler]
public static function createTicket(CreateTicket $command): static
{
$ticket = new self();
$ticket->id = Uuid::generate();
$ticket->assignedTo = $command->assignedTo;
$ticket->description = $command->description;
return $ticket;
}
#[CommandHandler]
public function changeTicket(CreateTicket $command): void
{
$this->description = $command->description;
$this->assignedTo = $command->assignedTo;
}
}#[Aggregate]
class Ticket
{
use WithEvents; // Provides methods for collecting events
#[Identifier]
private Uuid $ticketId;
#[CommandHandler]
public static function createTicket(
CreateTicket $command
): static
{
$self = new self($command->id);
$self->recordThat(new TicketWasCreated($command->id));
return $self;
}
}#[Aggregate]
class Ticket
{
#[Identifier]
private Uuid $ticketId;
#[CommandHandler]
public static function createTicket(
CreateTicket $command,
#[Header("executorId")] string $executorId,
#[Reference] Clock $clock,
): static
{
return new self(
$command->id,
$executorId,
$clock->currentTime(),
);
}
}namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\Identifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
#[Aggregate]
class Product
{
#[Identifier]
private int $productId;
private int $cost;
private function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
#[CommandHandler]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler]
public function getCost(GetProductPriceQuery $query) : int
{
return $this->cost;
}
}namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\Repository;
use Ecotone\Modelling\StandardRepository;
#[Repository] // 1
class InMemoryProductRepository implements StandardRepository // 2
{
/**
* @var Product[]
*/
private $products = [];
// 3
public function canHandle(string $aggregateClassName): bool
{
return $aggregateClassName === Product::class;
}
// 4
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
if (!array_key_exists($identifiers["productId"], $this->products)) {
return null;
}
return $this->products[$identifiers["productId"]];
}
// 5
public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void
{
$this->products[$identifiers["productId"]] = $aggregate;
}
}# As default auto wire of Laravel creates new service instance each time
# service is requested from Depedency Container, we need to register
# ProductService as singleton.
# Go to bootstrap/QuickStartProvider.php and register our ProductService
namespace Bootstrap;
use App\Domain\Product\InMemoryProductRepository;
use Illuminate\Support\ServiceProvider;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(InMemoryProductRepository::class, function(){
return new InMemoryProductRepository();
});
}
(...)Everything is set up by the framework, please continue...Everything is set up, please continue...bin/console ecotone:quickstart
Running example...
100
Good job, scenario ran with success!use Ecotone\Modelling\WithEvents;
#[Aggregate]
class Product
{
use WithEvents;
#[Identifier]
private int $productId;
private int $cost;
private function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
(...)bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!ecotone:
defaultErrorChannel: "errorChannel"return [
'defaultErrorChannel' => 'errorChannel',
];$ecotone = EcotoneLite::bootstrap(
configuration: ServiceConfiguration::createWithDefaults()
->withDefaultErrorChannel('errorChannel')
);class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
// For Message Consumer orders, configure error channel
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
];
}
}#[InternalHandler("errorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// handle exception
$exception = $errorMessage->getExceptionMessage();
}#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
// if retry strategy will not recover, then discard
"nullChannel"
);
}#[ErrorChannel("dbal_dead_letter")]
interface ResilientCommandBus extends CommandBus
{
}#[InstantRetry(retryTimes: 2)]
#[ErrorChannel("dbal_dead_letter")]
interface ResilientCommandBus extends CommandBus
{
}#[ErrorChannel("async_channel")]
interface ResilientCommandBus extends CommandBus
{
}final readonly class EcotoneConfiguration
{
#[ServiceContext]
public function databaseChannel()
{
return AmqpBackedMessageChannelBuilder::create('orders');
}
}#[ErrorChannel("async_channel")]
interface ResilientCommandBus extends CommandBus
{
}#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::create(
"async_channel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3)
);
}Commands CQRS PHP
In this section, we will look at how to use Commands, Events, and Queries. This will help you understand the basics of Ecotone’s CQRS support and how to build a message-driven application.
Command Handlers are methods where we typically place our business logic, so we’ll start by exploring how to use them.
Any service available in your Dependency Container can become a Command Handler.
Command Handlers are responsible for performing business actions in your system.
In Ecotone-based applications, you register a Command Handler by adding the CommandHandler attribute to the specific method that should handle the command:
In the example above, the #[CommandHandler] attribute tells Ecotone that the "createTicket" method should handle the CreateTicketCommand.
The first parameter of a Command Handler method determines which command type it handles — in this case, it is CreateTicketCommand.
In Ecotone, the class itself is not a Command Handler — only the specific method is. This means you can place multiple Command Handlers inside the same class, to make correlated actions available under same API class.
We send a Command using the Command Bus. After installing Ecotone, all Buses are automatically available in the Dependency Container, so we can start using them right away. Before we can send a Command, we first need to define it:
All Messages (Commands, Queries, and Events), as well as Message Handlers, are just plain PHP objects. They don’t need to extend or implement any Ecotone-specific classes. This keeps your business code clean, simple, and easy to understand.
To send a command, we use the send method on the CommandBus.
The command gets automatically routed to its corresponding Command Handler
We can send commands with metadata (also called Message Headers) through the Command Bus. This lets us include additional context that doesn't belong in the command itself, or share information across multiple Command Handlers without duplicating it in each command class.
And then to access given metadata, we will be using Header attribute:
The #[Header] attribute tells Ecotone to fetch a specific piece of metadata using the key executorId. This way, Ecotone knows exactly which metadata value to pass into our Command Handler.
If we use Command Handler, Ecotone will ensure our metadata will be serialized and deserialized correctly.
If we need additional services from the Dependency Container to handle our business logic, we can inject them into our Command Handler using the #[Reference] attribute:
In case Service is defined under custom id in DI, we may pass the reference name to the attribute:
In Ecotone we may register Command Handlers under routing instead of a class name.
This is especially useful if we will register to tell Ecotone how to deserialize given Command. This way we may simplify higher level code like Controllers or Console Line Commands by avoid transformation logic.
Ecotone is using message routing for . This way applications can stay decoupled from each other, as there is no need to share the classes between them.
There may be cases where creating Command classes is unnecessary boilerplate, in those situations, we may simplify the code and make use scalars, arrays or non-command classes directly.
Ecotone provides flexibility which allows to create Command classes when there are actually needed. In other cases we may use routing functionality together with simple types in order to fulfill our business logic.
Sometimes we need to return a value immediately after handling a command. This is useful for scenarios that require instant feedback—for example, when processing a payment, we might need to return a redirect URL to guide the user to the payment gateway. Ecotone's allows for returning data from Command Handler, that will be available as a result from your CommandBus:
The returned data will be available as result of the Command Bus.
Keep in mind that return values only work with synchronous Command Handlers. For asynchronous handlers, we can't return values directly because the command is processed in the background—instead, we'd use events or callbacks to communicate results back to the user when processing completes.
When any mechanism is configured (For example ), we can let Ecotone do the deserialization in-fly, so we don't need to both with doing custom transformations in the Controller:
Event CQRS PHP
Be sure to read before diving in this chapter.
The difference between Events and Command is in intention. Commands are meant to trigger an given action and events are information that given action was performed successfully.
To register Event Handler, we will be using EventHandler attribute. By marking given method as Event Handler, we are stating that this method should subscribe to specific Event Class:
In above scenario we are subscribing to TicketWasCreated Event, therefore whenever this Event will be published, this method will be automatically invoked. Events are Plain Old PHP Objects:
class TicketService
{
#[CommandHandler]
public function createTicket(CreateTicketCommand $command) : void
{
// handle create ticket command
}
}To test out Metadata, let's assume we just got new requirement for our Products in Shopping System.:
User who registered the product, should be able to change it's price.
Let's start by adding ChangePriceCommand
We will handle this Command in a minute. Let's first add user information for registering the product. We will do it, using Metadata. Let's get back to our Testing Class EcotoneQuickstart and add 4th argument to our CommandBus call.
sendWithRouting accepts 4th argument, which is associative array. Whatever we will place in here, will be available during message handling for us - This actually our Metadata. It's super simple to pass new Headers, it's matter of adding another key to the array. Now we can change our Product aggregate:
We have added second parameter $metadata to our CommandHandler. Ecotone read parameters and evaluate what should be injected. We will see soon, how can we take control of this process. We can add changePrice method now to our Aggregate:
And let's call it with incorrect userId and see, if we get the exception.
Let's run our testing command:
We have been just informed, that customers are registering new products in our system, which should not be a case. Therefore our next requirement is:
Only administrator should be allowed to register new Product
Let's create simple UserService which will tell us, if specific user is administrator.
In our testing scenario we will suppose, that only user with id of 1 is administrator.
Now we need to think where we should call our UserService. The good place for it, would not allow for any invocation of product.register command without being administrator, otherwise our constraint may be bypassed. Ecotone does allow for auto-wire like injection for endpoints. All services registered in Depedency Container are available.
Great, there is no way to bypass the constraint now. The isAdmin constraint must be satisfied in order to register new product. Let's correct our testing class.
Let's run our testing command:
Ecotone inject arguments based on Parameter Converters. Parameter converters , tells Ecotone how to resolve specific parameter and what kind of argument it is expecting. The one used for injecting services like UserService is Reference parameter converter. Let's see how could we use it in our product.register command handler.
Let's suppose UserService is registered under user-service in Dependency Container. Then we would need to set up the CommandHandlerlike below.
Reference- Does inject service from Dependency Container. If referenceName, which is name of the service in the container is not given, then it will take the class name as default.
Payload - Does inject payload of the message. In our case it will be the command itself
Headers - Does inject all headers as array.
Header - Does inject single header from the message.
There is more to be said about this, but at this very moment, it will be enough for us to know that such possibility exists in order to continue.
You may read more detailed description in Method Invocation section. \
Ecotone, if parameter converters are not passed provides default converters.
First parameter is always Payload.
The second parameter, if is array then Headers converter is taken, otherwise if class type hint is provided for parameter, then Reference converter is picked.
If we would want to manually configure parameters for product.register Command Handler, then it would look like this:
We could also inject specific header and let Ecotone convert it directly to specific object (if we have Converter registered):
Great, we have just finished Lesson 4!
In this Lesson we learned about using Metadata to provide extra information to our Message. Besides we took a look on how arguments are injected into endpoint and how we can make use of it. Now we will learn about powerful Interceptors, which can be describes as Middlewares on steroids.
In case of Command Handlers there may be only single Handler for given Command Class. This is not a case for Event Handlers, multiple Event Handler may subscribe to same Event Class.
To publish Events, we will be using EventBus.
EventBus is available in your Dependency Container by default, just like Command and Query buses.
You may use Ecotone's invocation control, to inject Event Bus directly into your Command Handler:
You may inject any other Service available in your Dependency Container, into your Message Handler methods.
Unlike Command Handlers which points to specific Command Handler, Event Handlers can have multiple subscribing Event Handlers.
Each Event Handler can be defined as Asynchronous. If multiple Event Handlers are marked for asynchronous processing, each of them is handled in isolation. This ensures that in case of failure, we can safely retry, as only failed Event Handler will be performed again.
If your Event Handler is interested in all Events around specific business concept, you may subscribe to Interface or Abstract Class.
And then instead of subscribing to TicketWasCreated or TicketWasCancelled, we will subscribe to TicketEvent.
We can also subscribe to different Events using union type hint. This way we can ensure that only given set of events will be delivered to our Event Handler.
We may subscribe to all Events published within the application. To do it we type hint for generic object.
Events can also be subscribed by Routing.
And then Event is published with routing key
Ecotone is using message routing for cross application communication. This way applications can stay decoupled from each other, as there is no need to share the classes between them.
There may be situations when we will want to subscribe given method to either routing or class name. Ecotone those subscriptions separately to protect from unnecessary wiring, therefore to handle this case, we can simply add another Event Handler which is not based on routing key.
This way we explicitly state that we want to subscribe by class name and by routing key.
Just like with Command Bus, we may pass metadata to the Event Bus:
If you make your Event Handler Asynchronous, Ecotone will ensure your metadata will be serialized and deserialized correctly.
By default Ecotone will ensure that your Metadata is propagated. This way you can simplify your code by avoiding passing around Headers and access them only in places where it matters for your business logic.
To better understand that, let's consider example in which we pass the metadata to the Command.
However in order to perform closing ticket logic, information about the executorId is not needed, so we don't access that.
However Ecotone will ensure that your metadata is propagated from Handler to Handler. This means that the context is preserved and you will be able to access executorId in your Event Handler.
#[ClassReference("ticketService")]
class TicketServiceclass readonly CreateTicketCommand
{
public function __construct(
public string $priority,
public string $description
){}
}class TicketController
{
// Command Bus will be auto registered in Depedency Container.
public function __construct(private CommandBus $commandBus) {}
public function createTicketAction(Request $request) : Response
{
$this->commandBus->send(
new CreateTicketCommand(
$request->get("priority"),
$request->get("description"),
)
);
return new Response();
}
}$messagingSystem->getCommandBus()->send(
new CreateTicketCommand(
$priority,
$description,
)
);class TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function closeTicketAction(Request $request, Security $security) : Response
{
$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorId" => $security->getUser()->getId()]
);
}
}$messagingSystem->getCommandBus()->send(
new CloseTicketCommand($ticketId),
["executorId" => $executorId]
);class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
// by adding Header attribute we state what metadata we want to fetch
#[Header("executorId")] string $executorId
): void
{
// handle closing ticket with executor from metadata
}
}class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
#[Reference] AuthorizationService $authorizationService
): void
{
// handle closing ticket with executor from metadata
}
}#[Reference("authorizationService")] AuthorizationService $authorizationServiceclass TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function createTicketAction(Request $request) : Response
{
$commandBus->sendWithRouting(
"createTicket",
$request->getContent(),
"application/json" // we tell what format is used in the request content
);
return new Response();
}
}$messagingSystem->getCommandBus()->sendWithRouting(
"createTicket",
$data,
"application/json"
);class TicketService
{
// Ecotone will do deserialization for the Command
#[CommandHandler("createTicket")]
public function createTicket(CreateTicketCommand $command): void
{
// handle creating ticket
}
}class TicketController
{
private CommandBus $commandBus;
public function __construct(CommandBus $commandBus)
{
$this->commandBus = $commandBus;
}
public function closeTicketAction(Request $request) : Response
{
$commandBus->sendWithRouting(
"closeTicket",
Uuid::fromString($request->get("ticketId"))
);
return new Response();
}
}$messagingSystem->getCommandBus()->sendWithRouting(
"closeTicket",
Uuid::fromString($ticketId)
);class TicketService
{
#[CommandHandler("closeTicket")]
public function closeTicket(UuidInterface $ticketId): void
{
// handle closing ticket
}
}class PaymentService
{
#[CommandHandler]
public function closeTicket(MakePayment $command): Url
{
// handle making payment
return $paymentUrl;
}
}$redirectUrl = $this->commandBus->send($command); public function createTicketAction(Request $request) : Response
{
$ticketId = $this->commandBus->send(
routingKey: 'createTicket',
command: $request->getContent(), // Ecotone will deserialize Command in-fly
commandMediaType: 'application/json',
);
return new Response([
'ticketId' => $ticketId
]);
}namespace App\Domain\Product;
class ChangePriceCommand
{
private int $productId;
private Cost $cost;
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : Cost
{
return $this->cost;
}
}public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
\json_encode(["productId" => 1, "cost" => 100]),
"application/json",
metadata: [
"userId" => 1
]
);
echo $this->queryBus->sendWithRouting("product.getCost", \json_encode(["productId" => 1]), "application/json");
}#[Aggregate]
class Product
{
use WithAggregateEvents;
#[Identifier]
private int $productId;
private Cost $cost;
private int $userId;
private function __construct(int $productId, Cost $cost, int $userId)
{
$this->productId = $productId;
$this->cost = $cost;
$this->userId = $userId;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command, array $metadata) : self
{
return new self(
$command->getProductId(),
$command->getCost(),
// all metadata is available for us.
// Ecotone automatically inject it, if second param is array
$metadata["userId"]
);
}#[CommandHandler("product.changePrice")]
public function changePrice(ChangePriceCommand $command, array $metadata) : void
{
if ($metadata["userId"] !== $this->userId) {
throw new \InvalidArgumentException("You are not allowed to change the cost of this product");
}
$this->cost = $command->getCost();
}public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
\json_encode(["productId" => 1, "cost" => 100]),
"application/json",
[
"userId" => 5
]
);
$this->commandBus->sendWithRouting(
"product.changePrice",
\json_encode(["productId" => 1, "cost" => 110]),
"application/json",
[
"userId" => 3
]
); bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
InvalidArgumentException
You are not allowed to change the cost of this product namespace App\Domain\Product;
class UserService
{
public function isAdmin(int $userId) : bool
{
return $userId === 1;
}
}#[CommandHandler("product.register")]
public static function register(
RegisterProductCommand $command,
array $metadata,
// Any non first class argument, will be considered an DI Service to inject
UserService $userService
) : self
{
$userId = $metadata["userId"];
if (!$userService->isAdmin($userId)) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
return new self($command->getProductId(), $command->getCost(), $userId);
}public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
\json_encode(["productId" => 1, "cost" => 100]),
"application/json",
[
"userId" => 1
]
);
$this->commandBus->sendWithRouting(
"product.changePrice",
\json_encode(["productId" => 1, "cost" => 110]),
"application/json",
[
"userId" => 1
]
);
echo $this->queryBus->sendWithRouting("product.getCost", \json_encode(["productId" => 1]), "application/json");
}bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
110
Good job, scenario ran with success!#[CommandHandler("product.register")]
public static function register(
RegisterProductCommand $command,
array $metadata,
#[Reference("user-service")] UserService $userService
) : self#[CommandHandler("product.register")]
public static function register(
#[Payload] RegisterProductCommand $command,
#[Headers] array $metadata,
#[Reference] UserService $userService
) : self
{
// ...
}#[CommandHandler("product.register")]
public static function register(
#[Payload] RegisterProductCommand $command,
// injecting specific header and doing the conversion string to UserId
#[Header("userId")] UserId $metadata,
#[Reference] UserService $userService
) : self
{
// ...
}class TicketController
{
public function __construct(private CommandBus $commandBus) {}
public function closeTicketAction(Request $request, Security $security) : Response
{
$this->commandBus->send(
new CloseTicketCommand($request->get("ticketId")),
["executorId" => $security->getUser()->getId()]
);
}
}$messagingSystem->getCommandBus()->send(
new CloseTicketCommand($ticketId),
["executorId" => $executorId]
);class TicketService
{
#[CommandHandler]
public function closeTicket(
CloseTicketCommand $command,
EventBus $eventBus
)
{
// close the ticket
// we simply publishing an Event, we don't pass any metadata here
$eventBus->publish(new TicketWasCreated($ticketId));
}
}class TicketService
{
#[EventHandler]
public function when(TicketWasCreated $event): void
{
// handle event
}
}class readonly TicketWasCreated
{
public function __construct(
public string $ticketId
) {}
}class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publish(new TicketWasCreated($ticketId));
}
}class TicketService
{
#[EventHandler]
public function when(TicketWasCreated $event): void
{
// handle event
}
}
class NotificationService
{
#[EventHandler]
public function sendNotificationX(TicketWasCreated $event): void
{
// handle event
}
#[EventHandler]
public function sendNotificationY(TicketWasCreated $event): void
{
// handle event
}
}interface TicketEvent
{
}class readonly TicketWasCreated implements TicketEvent
{
public function __construct(
public string $ticketId
) {}
}
class readonly TicketWasCancelled implements TicketEvent
{
public function __construct(
public string $ticketId
) {}
}#[EventHandler]
public function notify(TicketEvent $event) : void
{
// do something with $event
}#[EventHandler]
public function notify(TicketWasCreated|TicketWasCancelled $event) : void
{
// do something with $event
}#[EventHandler]
public function log(object $event) : void
{
// do something with $event
}class TicketService
{
#[EventHandler("ticket.was_created")]
public function when(TicketWasCreated $event): void
{
// handle event
}
}class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publishWithRouting(
"ticket.was_created",
new TicketWasCreated($ticketId)
);
}
}class TicketService
{
#[EventHandler]
#[EventHandler("ticket.was_created")]
public function when(TicketWasCreated $event): void
{
// handle event
}
}class TicketService
{
#[CommandHandler]
public function createTicket(
CreateTicketCommand $command,
EventBus $eventBus
) : void
{
// handle create ticket command
$eventBus->publish(
new TicketWasCreated($ticketId),
metadata: [
"executorId" => $command->executorId()
]
);
}
}class TicketService
{
#[EventHandler]
public function when(
TicketWasCreated $event,
// access metadata with given name
#[Header("executorId")] string $executorId
): void
{
// handle event
}
}class AuditService
{
#[EventHandler]
public function log(
TicketWasCreated $event,
// access metadata with given name
#[Header("executorId")] string $executorId
): void
{
// handle event
}
}This chapter provides more details about advanced Message Channel functionalities using Dynamic Message Channels. Dynamic Channels can be used to:
Simplify deployment strategy
Optimize system resources
Adjust message consumption or sending process, which is especially useful in SaaS and Multi-Tenant Environments
Dynamic Message Channels are available as part of Ecotone Enterprise.
The default strategy is to have single Message Consumer (Worker process) per Message Channel (Queue). When the volume of Messages is low however, some Consumers may actually be continuously in idle state. In order to not be wasteful about system resources we may want then to join the consumption, so single Message Consumer will poll from multiple Message Channels.
Suppose we do have Message Channels:
To prepare an Message Consumer that will be able to consume from those two Channels in manner (meaning each consumption is called after another one) we can set up Dynamic Message Channel.
Dynamic Message Channels can combine multiple channels, so we can treat them as a one.
After that we can consume using "orders_and_notifications" name. We then can run the endpoint:
We can combine as many channels as we want under single Dynamic Channel.
There may be situations when we would like to introduce Message Channel per Client. This if often a case in Multi-Tenant environments when premium Customer does get higher consumption rates. In Ecotone we can keep our code agnostic of Multiple Channels, and yet provide this ability to end users in a simple way. For this we will be using Header Based Strategy.
Taking as an example Order Process:
This code is fully agnostic to the details of Multi-Tenant environment. It does use Message Channel "orders" to process the Command. We can however make the "orders" an Dynamic Channel, which will actually distribute to multiple Channels. To do this we will introduce distribution based on the Metadata from the Command.
Now whenever this Command is sent with tenant metadata, Ecotone will decide to which Message Channel it should proxy the Message.
Above will work exactly the same for Events.
Then we would simply run those as separate Message Consumption processes (Workers):
We may want to introduce separate Message Channels for premium Tenants and just have a shared Message Channel for the rest. For this we would use default channel:
The default channel will be used, when no mapping will be found:
Running Dynamic Channels does not differ from normal channels.
If we will run "orders", which is Dynamic Channel combined of three other Channels, Ecotone will run Message Consumption process which will use round-robin strategy to consume from each of them:
Typically we would also run consumption process for this specific channels, which require extra processing power. This Message Consumer will focus only on Messages within that Channel.
When shared_consumer and tenant_abc will read from same Message Channel at the same time, it will work as . Therefore each will get his own unique Message.
By default whatever Message Channels we will define, we will be able to start Message Consumer for it. However if given set of Channels is only meant to be used under Dynamic Channel, we can make it explicit and avoid allowing them to be run separately.
To do so we use Internal Channels which will be available only for the Dynamic Channel visibility.
Internal Channels are only visible for the Dynamic Channel, therefore they can't be used for Asynchronous Message Handlers. What should be used for Async Handlers is the name of Dynamic Message Channel.
Let's take as an example of Multi-Tenant environment where each of our Clients has set limit of 5 orders to be processed within 24 hours. This limit is known to the Client and he may buy extra processing unit to increase his daily capacity.
Often used solution to skip processing is to reschedule Messages with a delay and check after some time if Client's message can now be processed. This solution however will waste resources, as we consume Messages that are not meant to be handled. Therefore Ecotone provides alternative, which skips the consumption completely, so we can avoid wasting resources on polling or rescheduling Messages, as we simply don't consume them at all.
To skip the consumption we will use Skipping Strategy. We will start by defining Message Channel per Client, so we can skip consumption from given Channel when Client have reached the limit.
The "decide_for_client" is the name of our that will do the decision.
This function will run in round-robin manner for each defined Message Channel (client_a, client-b).
By using Throttling Strategy we can easily rate limit our Clients. This can work dynamically, if Customer will buy credit credits, we can start returning true from decisioning method, which will kick-off the consumption. This means that we create real-time experience for Customers.
In some scenarios we may actually want to take full control over sending and receiving. In this situations we can make use of custom Strategies that completely replaces the inbuilt ones. This way we can tell which Message Channel we would like to send Message too, and from which Channel we would like to receive Message from.
To roll out custom receiving strategy we will use "withCustomReceivingStrategy":
To set up our Custom Strategy, we will use .
If we want to stop Consumption completely we can return "nullChannel" string. This will skip consuming given Channel. This may be useful in order to turn of given Message Consumer at run time.
To roll out custom receiving strategy we will use "withCustomReceivingStrategy":
To set up our Custom Strategy, we will use .
If we would like to discard given Message, we can return "nullChannel" string.
final readonly class EcotoneConfiguration
{
#[ServiceContext]
public function asyncChannels()
{
return [
DbalBackedMessageChannelBuilder::create('orders'),
DbalBackedMessageChannelBuilder::create('notifications'),
];
}
}#[ServiceContext]
public function dynamicChannel()
{
return [
DynamicMessageChannelBuilder::createRoundRobin(
'orders_and_notifications', // channel name to consume
['orders', 'notifications']
),
];
}bin/console ecotone:run orders_and_notifications -vvvartisan ecotone:run orders_and_notifications -vvv$messagingSystem->run("orders_and_notifications");#[Asynchronous("orders")]
#[CommandHandler]
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}#[ServiceContext]
public function dynamicChannel()
{
return [
// normal Message Channels
DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
// our Dynamic Channel used in Command Handler
DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
thisMessageChannelName: 'orders',
headerName: 'tenant',
headerMapping: [
'tenant_a' => 'tenant_a_channel',
'tenant_b' => 'tenant_b_channel',
]
),
];
}$this->commandBus->send(
new PlaceOrderCommand(),
metadata: [
'tenant' => 'tenant_a'
]
);# Symfony
bin/console ecotone:run tenant_a_channel -vvv
bin/console ecotone:run tenant_b_channel -vvv
# Laravel
artisan ecotone:run tenant_a_channel -vvv
artisan ecotone:run tenant_b_channel -vvv
# Ecotone Lite
$messagingSystem->run("tenant_a_channel");
$messagingSystem->run("tenant_b_channel");#[ServiceContext]
public function dynamicChannel()
{
return [
// normal Message Channels
DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
DbalBackedMessageChannelBuilder::create('shared_channel'),
DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
'orders',
'tenant',
[
'tenant_a' => 'tenant_a_channel',
'tenant_b' => 'tenant_b_channel',
],
'shared_channel' // the default channel, when above mapping does not apply
),
];
}$this->commandBus->send(
new PlaceOrderCommand(),
metadata: [
'tenant' => 'tenant_c' //no mapping for this tenant, go to default channel
]
);#[ServiceContext]
public function dynamicChannel()
{
return [
DbalBackedMessageChannelBuilder::create('tenant_a_orders'),
DbalBackedMessageChannelBuilder::create('tenant_b_orders'),
DbalBackedMessageChannelBuilder::create('tenant_c_orders'),
DynamicMessageChannelBuilder::createRoundRobin(
'orders',
['tenant_a_orders', 'tenant_b_orders', 'tenant_c_orders']
),
];
}bin/console ecotone:run orders -vvvartisan ecotone:run shared_consumer -vvv$messagingSystem->run("shared_consumer");bin/console ecotone:run tenant_a_orders -vvvartisan ecotone:run tenant_abc -vvv$messagingSystem->run("tenant_abc");#[ServiceContext]
public function dynamicChannel()
{
return [
DynamicMessageChannelBuilder::createRoundRobin(
'orders_and_notifications', // channel name to consume
['orders', 'notifications']
),
->withInternalChannels([
DbalBackedMessageChannelBuilder::create('orders'),
DbalBackedMessageChannelBuilder::create('notifications'),
]),
];
}#[ServiceContext]
public function dynamicChannel()
{
return [
DbalBackedMessageChannelBuilder::create('client_a'),
DbalBackedMessageChannelBuilder::create('client_b'),
DynamicMessageChannelBuilder::createThrottlingStrategy(
thisMessageChannelName: 'orders',
requestChannelName: 'decide_for_client',
channelNames: [
'client_a',
'client_b',
],
),
];
}#[InternalHandler('decide_for_client')]
public function decide(
string $channelNameToCheck, //this will be called in round robin with client_a / client_b
): bool
{
// by returning true we start the consumption process, by returning false we skip
return $this->checkIfReachedConsumptionLimit($channelNameToCheck);
}#[ServiceContext]
public function dynamicChannel()
{
return [
DynamicMessageChannelBuilder::createRoundRobin(
'orders',
['tenant_a', 'tenant_b', 'tenant_c']
)
// we change round robin receving strategy to our own customized one
->withCustomReceivingStrategy('decide_on_consumption'),
];
}final class Decider
{
/**
* @return string channel name to consume from
*/
#[InternalHandler('decide_on_consumption')]
public function toReceive(): string
{
// this should return specific message channel from which we will consume
}
}#[ServiceContext]
public function dynamicChannel()
{
return [
DynamicMessageChannelBuilder::createRoundRobin(
'orders',
['tenant_a', 'tenant_b', 'tenant_c']
)
// we change round robin sending strategy to our own customized one
->withCustomSendingStrategy('decide_on_send'),
];
}final class Decider
{
/**
* @return string channel name to send too
*/
#[InternalHandler('decide_on_send')]
public function toSend(Message $message): string
{
// this should return specific message channel to which we send
}
}Message Driven System with Domain Driven Design principles in PHP
The foundation idea
The roots of Object Oriented Programming were mainly about communication using Messages and logic encapsulation. The aim was to focus on the flows and communication, not on the objects itself. Objects were meant to be encapsulating logic, and expose clear interfaces of what they do, and what have they done.
If you know things like Events, Commands and Aggregates, then what was written above should feel familiar to you. This is because those concepts are build around same principles of old OOP where communication is done through Messages and Objects are meant to encapsulate logic. And Ecotone is about returning to those roots of Object Oriented Programming. It's about explicit System design where communication happen through Messages, in a way that is clear to follow and understand.
There is no possibility to immerse fully into Message based communication, as long as the foundation is not fully Message Driven. This means that each communication within the Application (not only between Applications) has to happen through Messages. This way it can become natural practice of how the system is being designed.
Ecotone follows on this making the Messaging the core of the Framework. It introduce Message based communication build around as the underlying foundation. This way even communication between PHP Objects can be done through Messages in seamless way.
As Ecotone follows Enterprise Integration Patterns, it makes the communication between Objects happening through Message Channels. We can think of Message Channel as pipe, where one side send Messages into, and the other consumes from it. As communication goes through Message Channels, it becomes really easy to switch the pipes. This basically means we can easily switch Message Channel implementations to use synchronous or asynchronous Channel, different Message Brokers, and yet our Objects will not be affected by that anyhow.
Ecotone provides different levels of abstractions, which we can choose to use from. Each abstraction is described in more details in related sections. In this Introduction section we will go over high level on how things can be used, to show what is Message based communication about.
Let's discuss our example from the above screenshot, where we want to register User and trigger Notification Sender. In Ecotone flows, we would introduce Command Handler being responsible for user registration:
As you can see, we also inject Event Bus which will publish our Event Message of User Was Registered.
Command and Events are the sole of higher level Messaging. On the low level everything is Message, yet each Message can either be understood as Command (intention to do), or Event (fact that happened). This make the clear distinction between - what we want to happen vs what actually had happened.
In our Controller we would inject Command Bus to send the Command Message:
After sending Command Message, our Command Handler will be executed. Command and Event Bus are available in our Dependency Container after Ecotone installation out of the box.
What is important here is that, Ecotone never forces us to implement or extend Framework specific classes. This means that our Command or Event Messages are POPO (clean PHP objects). In most of the scenarios we will simply mark given method with Attribute and Ecotone will glue the things for us.
We mentioned Notification Sender to be executed when User Was Registered Event happens. For this we follow same convention of using Attributes:
This Event Handler will be automatically triggered when related Event will be published. This way we can easily build decoupled flows, which hook in into existing business events.
Even so Commands and Events are Messages at the fundamental level, Ecotone distinguish them because they carry different semantics. By design Commands can only have single related Command Handler, yet Events can have multiple subscribing Event Handlers. This makes it easy for Developers to reason about the system and making it much easier to follow, as the difference between Messages is built in into the architecture itself.
From here we could decide to make use Message routing functionality to decouple Controllers from constructing Command Messages.
with this in mind, we can now user CommandBus with routing and even let Ecotone deserialize the Command, so our Controller does not even need to be aware of transformations:
When controllers simply pass through incoming data to Command Bus via routing, there is not much logic left in controllers. We could even have single controller, if we would be able to get routing key. It's really up to us, what work best in context of our system.
What we could decide to do is to add so called Interceptors (middlewares) to our Command Bus to add additional data or perform validation or access checks.
Pointcut provides the point which this interceptor should trigger on. In above scenario it will trigger when Command Bus is triggered before Message is send to given Command Handler. The reference attribute stays that given parameter is Service from Dependency Container and Ecotone should inject it.
There are multiple different interceptors that can hook at different moments of the flow. We could hook before Message is sent to Asynchronous Channel, or before executing Message Handler. We could also state that we want to hook for all Command Handlers or Event Handlers. And in each step we can decide what we want to do, like modify the messages, stop the flow, enforce security checks.
When we send Command using Command Bus, Ecotone under the hood construct a Message. Message contains of two things - payload and metadata. Payload is our Command and metadata is any additional information we would like to carry.
Metadata can be easily then accessed from our Command Handler or Interceptors
Besides metadata that we do provide, Ecotone provides additional metadata that we can use whenever needed, like Message Id, Correlation Id, Timestamp etc.
Ecotone take care of automatic Metadata propagation, no matter if execution synchronous or asynchronous. Therefore we can easily access any given metadata in targeted Message Handler, and also in any sub-flows like Event Handlers. This make it really easy to carry any additional information, which can not only be used in first executed Message Handler, but also in any flow triggered as a result of that.
As we mentioned at the beginning of this introduction, communication happen through Message Channels, and thanks to that it's really easy to switch code from synchronous to asynchronous execution. For that we would simply state that given Message Handler should be executed asynchronously:
Now before this Event Handler will be executed, it will land in Asynchronous Message Channel named "async" first, and from there it will be consumed asynchronously by Message Consumer (Worker process).
There maybe situations where multiple Asynchronous Event Handlers will be subscribing to same Event. We can easily imagine that one of them may fail and things like retries become problematic (As they may trigger successful Event Handlers for the second time). That's why Ecotone introduces , which deliver a copy of the Message to each related Event Handler separately. As a result each Asynchronous Event Handler is handling it's own Message in full isolation, and in case of failure only that Handler will be retried.
If we are using Eloquent, Doctrine ORM, or Models with custom storage implementation, we could push our implementation even further and send the Command directly to our Model.
We are marking our model as Aggregate, this is concept from Domain Driven Design, which describe a model that encapsulates the business logic.
Ecotone will take care of loading the Aggregate and storing them after the method is called. Therefore all we need to do it to send an Command.
Like you can see, we also added "block()" method, which will block given user. Yet it does not hold any Command as parameter. In this scenario we don't even need Command Message, because the logic is encapsulated inside nicely, and passing a status from outside could actually allow for bugs (e.g. passing UserStatus::active). Therefore all we want to know is that there is intention to block the user, the rest happens within the method.
To execute our block method we would call Command Bus this way:
There is one special metadata here, which is "aggregate.id", this tell Ecotone the instance of User which it should fetch from storage and execute this method on. There is no need to create Command Class at all, because there is no data we need to pass there. This way we can build features with ease and protect internal state of our Models, so they are not modified in incorrect way.
One of the powers that Message Driven Architecture brings is ability to build most sophisticated workflows with ease. This is possible thanks, because each Message Handler is considered as Endpoint being able to connect to input and output Channels. This is often referenced as pipe and filters architecture, but in general this is characteristic of true message-driven systems.
Let's suppose that our registered user can apply for credit card, and for this we need to pass his application through series of steps to verify if it's safe to issue credit card for him:
We are using outputChannelName here to indicate where to pass Message after it's handled by our Command Handler. In here we could enrich our CardApplication with some additional data, or create new object. However it's fully fine to pass same object to next step, if there was no need to modify it.
Ecotone provides ability to pass same object between workflow steps. This simplify the flow a lot, as we are not in need to create custom objects just for the framework needs, therefore we stick what is actually needed from business perspective.
Let's define now location where our Message will land after:
We are using here InternalHandler, internal handlers are not connected to any Command or Event Buses, therefore we can use them as part of the workflow steps, which we don't want to expose outside.
It's really up to us whatever we want to define Message Handlers in separate classes or not. In general due to declarative configuration in form of Attributes, we could define the whole flow within single class, e.g. "CardApplicationProcess". Workflows can also be started from Command or Event Handlers, and also directly through Business Interfaces. This makes it easy to build and connect different flows, and even reuse steps when needed.
Our Internal Handler contains of inputChannelName which points to the same channel as our Command Handlers outputChannelName. This way we bind Message Handlers together to create workflows. As you can see we also added Asynchronous attribute, as process of identity verification can take a bit of time, we would like it to happen in background.
Let's define our last step in Workflow:
This we've made synchronous which is the default if no Asynchronous attribute is defined. Therefore it will be called directly after Identity verification.
Workflows in Ecotone are fully under our control defined in PHP. There is no need to use 3rd party, or to define the flows within XMLs or YAMLs. This makes it really maintainable solution, which we can change, modify and test them easily, as we are fully on the ownership of the process from within the code. It's worth to mention that workflows are in general stateless as they pass Messages from one pipe to another. However if we would want to introduce statefull Workflow we could do that using Ecotone's Sagas.
Ecotone handles failures at the architecture level to make Application clear of those concerns. As Messages are the main component of communication between Applications, Modules or even Classes in Ecotone, it creates space for recoverability in all parts of the Application. As Messages can be retried instantly or with delay without blocking other processes from continuing their work.
As Message are basically data records which carry the intention, it opens possibility to store that "intention", in case unrecoverable failure happen. This means that when there is no point in delayed retries, because we encountered unrecoverable error, then we can move that Message into persistent store. This way we don't lose the information, and when the bug is fixed, we can simply retry that Message to resume the flow from the place it failed.
There are of course more resiliency patterns, that are part of Ecotone, like:
Automatic retries to send Messages to Asynchronous Message Channels
Reconnection of Message Consumers (Workers) if they lose the connection to the Broker
Inbuilt functionalities like Message Outbox, Error Channels with Dead Letter, Deduplication of Messages to avoid double processing,
and many many more.
The flow that Ecotone based on the Messages makes the Application possibile to handle failures at the architecture level. By communicating via Messages we are opening for the way, which allows us to self-heal our application without the need for us intervene, and in case of unrecoverable failures to make system robust enough to not lose any information and quickly recover from the point o failure when the bug is fixed.
Ecotone shifts the focus from technical details to the actual business processes, using Resilient Messaging as the foundation on which everything else is built. It provides seamless communication using Messages between Applications, Modules or even different Classes.
Together with that we will be using Declarative Configuration with attributes to avoid writing and maintaining configuration files. We will be stating intention of what we want to achieve instead wiring things ourselves, as a result we will regain huge amount of time, which can be invested in more important part of the System. And together with that, we will be able to use higher level Build Blocks like Command, Event Handlers, Aggregates, Sagas which connects to the messaging seamlessly, and helps encapsulate our business logic. So all the above serves as pillars for creating so called Business Oriented Architecture:
Resilient Messaging - At the heart of Ecotone lies a resilient messaging system that enables loose coupling, fault tolerance, and self-healing capabilities.
Declarative Configuration - Introduces declarative programming with Attributes. It simplifies development, reduces boilerplate code, and promotes code readability. It empowers developers to express their intent clearly, resulting in more maintainable and expressive codebases.
Building Blocks - Building blocks like Message Handlers, Aggregates, Sagas, facilitate the implementation of the business logic. By making it possible to bind Building Blocks with Resilient Messaging, Ecotone makes it easy to build and connect even the most complex business workflows.
Having this foundation knowledge and understanding how Ecotone works on the high level, it's good moment to dive into , which will provide hands on experience to deeper understanding.
Ecotone blog provides articles which describes Ecotone's architecture and related features in more details. Therefore if you want to find out more, follow bellow links:
PHP Interceptors Middlewares
Ecotone provide possibility to handle via Interceptors.
Interceptor intercepts the process of handling the message, this means we can do actions like:
Enriching the
Stopping or modify usual processing cycle




Moving from one format to another requires conversion. Ecotone does provide extension points in which we can integrate different Media Type Converters to do this type of conversion.
Let's build our first converter from JSON to our PHP format. In order to do that, we will need to implement Converter interface and mark it with MediaTypeConverter().
TypeDescriptor - Describes type in PHP format. This can be class, scalar (int, string), array etc.
MediaType - Describes Media type format. This can be application/json, application/xml etc.
$source - is the actual data to be converted.
Let's start with implementing matches method. Which tells us, if this converter can do conversion from one type to another.
This will tell Ecotone that in case source media type is JSON and target media type is PHP, then it should use this converter. Now we can implement the convert method. We will do pretty naive solution, just for the proof the concept.
And let's add fromArray method to RegisterProductCommand and GetProductPriceQuery.
Let's run our testing command:
If we call our testing command now, everything is going fine, but we still send PHP objects instead of JSON, therefore there was not need for Conversion. In order to start sending Commands and Queries in different format, we need to provide our handlers with routing key. This is because we do not deal with Object anymore, therefore we can't do the routing based on them.
Let's change our Testing class, so we call buses with JSON format.
We make use of different method now sendWithRouting.
It takes as first argument routing key to which we want to send the message.
The second argument describes the format of message we send.
Third is the data to send itself, in this case command formatted as JSON.
Let's run our testing command:
Normally we don't want to deal with serialization and deserialization, or we want to make the need for configuration minimal. This is because those are actually time consuming tasks, which are more often than not a copy/paste code, which we need to maintain.
Ecotone comes with integration with JMS Serializer to solve this problem. It introduces a way to write to reuse Converters and write them only, when that's actually needed. Therefore let's replace our own written Converter with JMS one. Let's download the Converter using Composer.
composer require ecotone/jms-converter
Let's remove __construct and fromArray methods from RegisterProductCommand GetProductPriceQuery, and the JsonToPHPConverter class completely, as we won't need it anymore.
JMS creates cache to speed up serialization process. In case of problems with running this test command, try to remove your cache. Let's run our testing command:
Do you wonder, how come, that we just deserialized our Command and Query classes without any additional code? JMS Module reads properties and deserializes according to type hint or docblock for arrays. It's pretty straight forward and logical:
Let's imagine we found out, that we have bug in our software. Our system users have registered product with negative price, which in result lowered the bill.
Product should be registered only with positive cost
We could put constraint in Product, validating the Cost amount. But this would assure us only in that place, that this constraint is met. Instead we want to be sure, that the Cost is correct, whenever we make use of it, so we can avoid potential future bugs. This way we will know, that whenever we will deal with Cost object, we will now it's correct. To achieve that we will create Value Object named Cost that will handle the validation, during the construction.
Great, but where to convert the integer to the Cost class? We really don't want to burden our business logic with conversions. Ecotone JMS does provide extension points, so we can tell him, how to convert specific classes.
Let's create class App\Infrastructure\Converter\CostConverter. We will put it in different namespace, to separate it from the domain.
We mark the methods with Converter attribute, so Ecotone can read parameter type and return type in order to know, how he can convert from scalar/array to specific class and vice versa. Let's change our command and aggregate class, so it can use the Cost directly.
The $cost class property will be automatically converted from integer to Cost by JMS Module.
Let's run our testing command:
In this Lesson we learned how to make use of Converters. The command which we send from outside (to the Command Bus) is still the same, as before. We changed the internals of the domain, without affecting consumers of our API. In next Lesson we will learn and Method Invocation and Metadata
Great, we just finished Lesson 3!
class UserService
{
#[CommandHandler]
public function register(RegisterUser $command, EventBus $eventBus): void
{
// store user
$eventBus->publish(new UserWasRegistered($userId));
|
}public function registerAction(Request $request, CommandBus $commandBus): Response
{
$command = // construct command
$commandBus->send($command);
}class NotificationSender
{
#[EventHandler]
public function when(UserWasRegistered $event): void
{
// send notification
}
}#[CommandHandler(routingKey: "user.register")]
public function register(RegisterUser $command, EventBus $eventBus): voidpublic function registerAction(Request $request, CommandBus $commandBus): Response
{
$commandBus->sendWithRouting(
routingKey: "user.register",
command: $request->getContent(),
commandMediaType: "application/json"
);
}#[Before(pointcut: CommandBus::class)]
public function validateAccess(
RegisterUser $command,
#[Reference] AuthorizationService $authorizationService
): void
{
if (!$authorizationService->isAdmin) {
throw new AccessDenied();
}
}public function registerAction(Request $request, CommandBus $commandBus): Response
{
$commandBus->sendWithRouting(
routingKey: "user.register",
command: $request->getContent(),
commandMediaType: "application/json",
metadata: [
"executorId" => $this->currentUser()->getId()
]
);
}#[CommandHandler(routingKey: "user.register")]
public function register(
RegisterUser $command,
EventBus $eventBus,
#[Header("executorId")] string $executorId,
): void#[Asynchronous("async")]
#[EventHandler]
public function when(UserWasRegistered $event): void#[Aggregate]
class User
{
use WithEvents;
#[Identifier]
private UserId $userId;
private UserStatus $status;
#[CommandHandler(routingKey: "user.register")]
public static function register(RegisterUser $command): self
{
$user = //create user
$user->recordThat(new UserWasRegistered($userId));
return $user;
}
#[CommandHandler(routingKey: "user.block")]
public function block(): void
{
$this->status = UserStatus::blocked;
}
}public function blockAction(Request $request, CommandBus $commandBus): Response
{
$commandBus->sendWithRouting(
routingKey: "user.block",
metadata: [
"aggregate.id" => $request->get('userId'),
]
);
}class CreditCardApplicationProcess
{
#[CommandHandler(
routingKey: "apply_for_card",
outputChannelName: "application.verify_identity"
)]
public function apply(CardApplication $application): CardApplication
{
// store card application
return $application;
}
}#[Asynchronous("async")]
#[InternalHandler(
inputChannelName: "application.verify_identity",
outputChannelName: "application.send_result"
)]
public function verifyIdentity(CardApplication $application): ApplicationResult
{
// do the verification
return new ApplicationResult($result);
}#[InternalHandler(
inputChannelName: "application.send_result"
)]
public function sendResult(ApplicationResult $application): void
{
// send result
}<?php
namespace App\Domain\Product;
use Ecotone\Messaging\Attribute\MediaTypeConverter;
use Ecotone\Messaging\Conversion\Converter;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Messaging\Handler\TypeDescriptor;
#[MediaTypeConverter]
class JsonToPHPConverter implements Converter
{
public function matches(TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType): bool
{
}
public function convert($source, TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType)
{
}
}public function matches(TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType): bool
{
return $sourceMediaType->isCompatibleWith(MediaType::createApplicationJson()) // if source media type is JSON
&& $targetMediaType->isCompatibleWith(MediaType::createApplicationXPHP()); // and target media type is PHP
}public function convert($source, TypeDescriptor $sourceType, MediaType $sourceMediaType, TypeDescriptor $targetType, MediaType $targetMediaType)
{
$data = \json_decode($source, true, 512, JSON_THROW_ON_ERROR);
// $targetType hold the class, which we will convert to
switch ($targetType->getTypeHint()) {
case RegisterProductCommand::class: {
return RegisterProductCommand::fromArray($data);
}
case GetProductPriceQuery::class: {
return GetProductPriceQuery::fromArray($data);
}
default: {
throw new \InvalidArgumentException("Unknown conversion type");
}
}
}class GetProductPriceQuery
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public static function fromArray(array $data) : self
{
return new self($data['productId']);
}class RegisterProductCommand
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public static function fromArray(array $data) : self
{
return new self($data['productId'], $data['cost']);
}bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler("product.getCost")]
public function getCost(GetProductPriceQuery $query) : int
{
return $this->cost;
}(...)
public function run() : void
{
$this->commandBus->sendWithRouting("product.register", \json_encode(["productId" => 1, "cost" => 100]), "application/json");
echo $this->queryBus->sendWithRouting("product.getCost", \json_encode(["productId" => 1]), "application/json");
}bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!Conversion Table examples:
Source => Converts too
private int $productId => int
private string $data => string
private \stdClass $data => \stdClass
/**
* @var \stdClass[]
*/
private array $data => array<\stdClass>namespace App\Domain\Product;
class Cost
{
private int $amount;
public function __construct(int $amount)
{
if ($amount <= 0) {
throw new \InvalidArgumentException("The cost cannot be negative or zero, {$amount} given.");
}
$this->amount = $amount;
}
public function getAmount() : int
{
return $this->amount;
}
public function __toString()
{
return (string)$this->amount;
}
}namespace App\Infrastructure\Converter;
use App\Domain\Product\Cost;
use Ecotone\Messaging\Attribute\Converter;
class CostConverter
{
#[Converter]
public function convertFrom(Cost $cost) : int
{
return $cost->getAmount();
}
#[Converter]
public function convertTo(int $amount) : Cost
{
return new Cost($amount);
}
}class RegisterProductCommand
{
private int $productId;
private Cost $cost;
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : Cost
{
return $this->cost;
}
}class Product
{
use WithAggregateEvents;
#[Identifier]
private int $productId;
private Cost $cost;
private function __construct(int $productId, Cost $cost)
{
$this->productId = $productId;
$this->cost = $cost;
$this->recordThat(new ProductWasRegisteredEvent($productId));
}
#[CommandHandler("product.register")]
public static function register(RegisterProductCommand $command) : self
{
return new self($command->getProductId(), $command->getCost());
}
#[QueryHandler("product.getCost")]
public function getCost(GetProductPriceQuery $query) : Cost
{
return $this->cost;
}
}bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!Calling some shared functionality or adding additional behavior\
This all can be done without modifying the code itself, as we hook into the existing flows.
Type of Interceptor more about it Interceptor Types section
Precedence defines ordering of called interceptors. The lower the value is, the quicker Interceptor will be called. It's safe to stay with range between -1000 and 1000, as numbers bellow -1000 and higher than 1000 are used by Ecotone.
The precedence is done within a specific interceptor type.
Every interceptor has Pointcut attribute, which describes for specific interceptor, which endpoints it should intercept.
CLASS_NAME - indicates intercepting specific class or interface or class containing attribute on method or class level
CLASS_NAME::METHOD_NAME - indicates intercepting specific method of class
NAMESPACE* - Indicating all Endpoints starting with namespace prefix e.g. App\Domain\*
expression || expression - Indicating one expression or another e.g. Product\*||Order\*
expression && expression - Indicating one expression and another e.g.
App\Domain\* && App\Attribute\RequireAdministrator
There are four types of interceptors. Each interceptor has it role and possibilities. Interceptors are called in following order:
Before
Around
After
Presend
Before Interceptor is called after message is sent to the channel, before execution of Endpoint.
Before interceptor is called before endpoint is executed.
Before interceptors can used in order to stop the flow, throw an exception or enrich the Message.
To understand it better, let's follow an example, where we will intercept Command Handler with verification if executor is an administrator.
Let's start by creating Attribute called RequireAdministrator in new namepace.
Let's create our first Before Interceptor.
We are using in here Pointcut which is looking for #[RequireAdministrator] annotation in each of registered Endpoints.
The void return type is expected in here. It tells Ecotonethat, this Before Interceptor is not modifying the Message and message will be passed through. The message flow however can be interrupted by throwing exception.
Now we need to annotate our Command Handler:
Whenever we call our command handler, it will be intercepted by AdminVerificator now.
If return type is not void new Message will be created from the returned type.
Let's follow an example, where we will enrich Message payload with timestamp.
Suppose we want to add executor Id, but as this is not part of our Command, we want add it to our Message Headers.
If return type is not void new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders: true it will tell Ecotone, that we we want to modify Message headers instead of payload.
Use Message Filter, to eliminate undesired messages based on a set of criteria.
This can be done by returning null from interceptor, if the flow should proceed, then payload should be returned.
If return type is not void new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders=trueit will tell Ecotone, that we we want to modify Message headers instead of payload.
The Around Interceptor have access to actual Method Invocation.This does allow for starting action before method invocation is done, and finishing it after.
Around interceptoris a good place for handling actions like Database Transactions or logic that need to access invoked object.
As we used Command Bus interface as pointcut, we told Ecotone that it should intercept Command Bus Gateway. Now whenever we will call any method on Command Bus, it will be intercepted with transaction.
The other powerful use case for Around Interceptor is intercepting Aggregate.
Suppose we want to verify, if executing user has access to the Aggregate.
We have placed @IsOwnerOfPerson annotation as the top of class. For interceptor pointcut it means, that each endpoint defined in this class should be intercepted. No need to add it on each Command Handler now.
We've passed the executd Aggregate instance - Person to our Interceptor. This way we can get the context of the executed object in order to perform specific logic.
After interceptor is called after endpoint execution has finished.
It does work exactly the same as Before Interceptor.
After interceptor can used to for example to enrich QueryHandler result.
We will intercept all endpoints within Order\ReadModel namespace, by adding result coming from the endpoint under result key.
Presend Interceptor is called before Message is actually send to the channel.
In synchronous channel there is no difference between Before and Presend.
The difference is seen when the channel is asynchronous.
Presend Interceptor can used to verify if data is correct before sending to asynchronous channel, or we may want to check if user has enough permissions to do given action.
This will keep our asynchronous channel free of incorrect messages.
Presend can't intercept Gateways like (Command/Event/Query) buses, however in context of Gateways using Before Interceptor lead to same behaviour, therefore can be used instead.
class AdminVerificator
{
#[Before(precedence: 0, pointcut: "Order\Domain\*")]
public function isAdmin(array $payload, array $headers) : void
{
if ($headers["executorId"] != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}#[\Attribute]
class RequireAdministrator {}class AdminVerificator
{
#[Before(pointcut: RequireAdministrator::class)]
public function isAdmin(array $payload, array $headers) : void
{
if ($headers["executorId"] != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}#[CommandHandler]
#[RequireAdministrator] // Our Application level defined Attribute
public function changePrice(ChangePriceCommand $command) : void
{
// do something with $command
}#[\Attribute]
class AddTimestamp {}class TimestampService
{
#[Before(pointcut: AddTimestamp::class)]
public function add(array $payload) : array
{
return array_merge($payload, ["timestamp" => time()]);
}
}class ChangePriceCommand
{
private int $productId;
private int $timestamp;
}
#[CommandHandler]
#[AddTimestamp]
public function changePrice(ChangePriceCommand $command) : void
{
// do something with $command and timestamp
}#[\Attribute]
class AddExecutor {}class TimestampService
{
#[Before(pointcut: AddExecutor::class, changeHeaders: true)]
public function add() : array
{
return ["executorId" => 1];
}
}#[CommandHandler]
#[AddExecutor]
public function changePrice(ChangePriceCommand $command, array $metadata) : void
{
// do something with $command and executor id $metadata["executorId"]
}#[\Attribute]
class SendNotificationOnlyIfInterested {}class NotificationFilter
{
#[Before(pointcut: SendNotificationOnlyIfInterested::class, changeHeaders: true)]
public function filter(PriceWasChanged $event) : ?array
{
if ($this->isInterested($event) {
return $event; // flow proceeds
}
return null; // message is eliminated, flow stops.
}
}#[EventHandler]
#[SendNotificationOnlyIfInterested]
public function sendNewPriceNotification(ChangePriceCommand $event) : void
{
// do something with $event
}class TransactionInterceptor
{
#[Around(pointcut: Ecotone\Modelling\CommandBus::class)]
public function transactional(MethodInvocation $methodInvocation)
{
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
}catch (\Throwable $exception) {
$this->connection->rollBack();
throw $exception;
}
return $result;
}
}#[Aggregate]
#[IsOwnedByExecutor]
class Person
{
private string $personId;
#[CommandHandler]
public function changeAddress(ChangeAddress $command) : void
{
// change address
}
public function hasPersonId(string $personId) : bool
{
return $this->personId === $personId;
}
}#[\Attribute]
class IsOwnedByExecutor {}class IsOwnerVerificator
{
#[Around(pointcut: IsOwnedByExecutor::class)]
public function isOwner(MethodInvocation $methodInvocation, Person $person, #[Headers] array $metadata)
{
if (!$person->hasPersonId($metadata["executoId"]) {
throw new \InvalidArgumentException("No access to do this action!");
}
return $methodInvocation->proceed();
}
}namespace Order\ReadModel;
class OrderService
{
#[QueryHandler]
public function getOrderDetails(GetOrderDetailsQuery $query) : array
{
return ["orderId" => $query->getOrderId()]
}
} class AddResultSet
{
#[After(pointcut: "Order\ReadModel\*")
public function add(array $payload) : array
{
return ["result" => $payload];
}
}class VerifyIfAuthenticated
{
#[Presend(pointcut: Ecotone\Modelling\Attribute\CommandHandler::class)]
public function verify(#[Header("executorId")] ?string $executorId) : void
{
if (!$executorId) {
throw new \InvalidArgumentException("User must be logged");
}
}
}
class IsEventAlreadyHandled
{
private Storage $storage;
#[Presend(pointcut: Ecotone\Modelling\Attribute\EventHandler::class)]
public function verify($payload, #[Header("messageId")] string $messageId)
{
if ($this->storage->isHandled($messageId)) {
return null;
}
return $payload;
}
}PHP Middlewares Interceptors
Ecotone provide us with possibility to handle cross cutting concerns via Interceptors.
Interceptor as name suggest, intercepts the process of handling the message.
You may enrich the message, stop or modify usual processing cycle, call some shared functionality, add additional behavior to existing code without modifying the code itself.
After one of our administrators went for holiday, the others found out, they can't change cost of the product and this become really problematic for them.
Administrators should be able to change the cost of a product
We could copy paste the logic from product.register to product.changePricebut we want to avoid code duplication, especially logic that may happen more often. Let's intercept our Command Handlers.
Let's start by creating Annotation called RequireAdministrator in new namepace App\Infrastructure\RequireAdministrator
Let's create our first Before Interceptor. Start by removing old UserService and create new one in different namespace App\Infrastructure\RequireAdministrator. Remember to mark return type as void, we will see why it is so important soon.
Before- marks method as Interceptor, so it can be be found by Ecotone.
Pointcut - describes what should be intercepted.
CLASS_NAME - indicates what should be intercepted using specific Class Name or Attribute Name annotated at the level of method or class
NAMESPACE* - Indicating all starting with namespace e.g. App\Domain\Product\*
Now we need to annotate our Command Handlers:
We told Before Interceptor that it should intercept all endpoints with annotation RequireAdministrator.
Now, whenever we will call our command handlers, they will be intercepted by UserService.
You can try it out, by providing different userId.
Before and After interceptors are depending on the return type, to decide if they should modify or pass it through.
If return type is different than void, Message payload or headers can be enriched with data.
If return type is void then message will be passed through and the process of message flow can be interrupted by throwing exception only.
Instead of providing the userId during calling the CommandBus we will enrich with it before it will be handled by Command Handler using Interceptor.
Let's change our testing class to remove metadata and add the Interceptor.
changeHeaders - Tells Ecotone if this Interceptor modifies payload or headers. The default is payload.
If changeHeaders=true thenheaders are picked and associative array must be returned. The returned value is merged with current headers.
If changeHeaders=false then payload is picked and current payload is replaced by returned value, the headers stays the same.
You may of course inject current payload and headers into the method if needed, as with usual endpoint.
&#xNAN;precedence - Tells Ecotone in what order interceptors should be called. The lower the value is the quicker interceptor will be called. The order exists within interceptor type:
Let's annotate Product aggregate
If we annotate aggregate on the class level. Then it does work like each of the method would be annotated with specific annotation in this case @AddUserId.
Let's run our testing command:
If during Before or Around you decide to break the flow, return null. Nullindiciates, that there is no message and the current flow ends.
Null can not be returned in header changing interceptor, it does work only for payload changing interceptor.
The Around Interceptor is closet to actual endpoint's method call. Thanks to that, it has access to Method Invocation.This does allow for starting some procedure and ending after the invocation is done.
We will add real database to our example using if you do not have extension installed, then you will need to install it first. Yet if you are using Quickstart's Docker container, then you are ready to go.
Let's start by implementing repository, that will be able to handle any aggregate, by storing it in sqlite database.
Before we do that, we need to remove our In Memory implementation class App\Domain\Product\InMemoryProductRepository we will replace it with our new implementation.
We will create using new namespace for it App\Infrastructure\Persistence.
Besides we are going to use , as this is really helpful abstraction over the PDO.
And the :
Connection to sqlite database using dbal library
Serializer is registered by Ecotone.
Serializer can handle serialization using .
It this case it will know how to register Cost class, as we already registered Converter for it.
Serializer give us access for conversion from PHP type to specific Media Type or from specific Media Type to PHP type. We will use it to easily serialize our
We want to intercept Command Bus Gateway with transaction. So whenever we call it, it will invoke our Command Handler within transaction.
pointcut="Ecotone\Modelling\CommandBus"
This pointcut will intercept CommandBus.
Let's run our testing command:
We do have two transactions started, because we call the Command Bus twice.
Each of interceptors, can inject attribute, which was used for pointcut. Just type hint for it in method declaration.
Around interceptors can inject intercepted class instance. In above example it would be Command Bus.
In case of Command Bus it may seems not needed, but if we would intercept Aggregate, then it really useful as for example you may verify if executing user have access to it.
You may read more about interceptors in .
Great, we have just finished Lesson 5!
Interceptors are very powerful concept. Without extending any classes or interfaces from Ecotone, we can build build up Authorization, Transactions, Delegate duplicated logic, Call some external service, Logging and Tracing before invoking endpoint, the amount of possibilities is endless.
In the next chapter, we will learn about scheduling and polling endpoints
expression||expression - Indicating one expression or another e.g. Product\*||Order\*before/around/after.AddUserId InterceptorRequireAdministrator InterceptoruserIdAddUserIdService0UserService1ProductJSONThis does create database table, if needed. It does create simple table structure containing id of the aggregate, the class type and serialized data in JSON. Take a look at createSharedTableIfNeeded if you want more details.
Deserialize aggregate to PHP
Serialize aggregate to JSON
namespace App\Infrastructure\RequireAdministrator;
#[\Attribute]
class RequireAdministrator {}namespace App\Infrastructure\RequireAdministrator;
class UserService
{
#[Before(pointcut: RequireAdministrator::class)]
public function isAdmin(#[Header("userId")] ?string $userId) : void
{
if ($userId != 1) {
throw new \InvalidArgumentException("You need to be administrator to perform this action");
}
}
}pointcut="App\Domain\Product\Product"use App\Infrastructure\RequireAdministrator\RequireAdministrator;
(...)
#[CommandHandler("product.register")]
#[RequireAdministrator]
public static function register(RegisterProductCommand $command, array $metadata) : self
{
return new self($command->getProductId(), $command->getCost(), $metadata["userId"]);
}
#[CommandHandler("product.changePrice")]
#[RequireAdministrator]
public function changePrice(ChangePriceCommand $command) : void
{
$this->cost = $command->getCost();
}public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
\json_encode(["productId" => 1, "cost" => 100]),
"application/json"
);
$this->commandBus->sendWithRouting(
"product.changePrice",
\json_encode(["productId" => 1, "cost" => 110]),
"application/json"
);
echo $this->queryBus->sendWithRouting("product.getCost", \json_encode(["productId" => 1]), "application/json");
}namespace App\Infrastructure\AddUserId;
#[\Attribute]
class AddUserId {}namespace App\Infrastructure\AddUserId;
class AddUserIdService
{
#[Before(precedence: 0, pointcut: AddUserId::class, changeHeaders: true)]
public function add() : array
{
return ["userId" => 1];
}
}class UserService
{
#[Before(precedence: 1,pointcut: RequireAdministrator::class)]
public function isAdmin(#[Header("userId")] ?string $userId) : void
{
if ($userId != 1) {
throw new \InvalidArgumentException("You need to be administrator in order to register new product");
}
}
}use App\Infrastructure\AddUserId\AddUserId;
#[Aggregate]
#[AddUserId]
class Product
{bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
110
Good job, scenario ran with success!composer require doctrine/dbalnamespace App\Infrastructure\Persistence;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Ecotone\Messaging\Gateway\Converter\Serializer;
use Ecotone\Modelling\Attribute\Repository;
use Ecotone\Modelling\StandardRepository;
#[Repository]
class DbalRepository implements StandardRepository
{
const TABLE_NAME = "aggregate";
const CONNECTION_DSN = 'sqlite:////tmp/db.sqlite';
private Connection $connection; // 1
private Serializer $serializer; // 2
public function __construct(Serializer $serializer)
{
$this->connection = DriverManager::getConnection(array('url' => self::CONNECTION_DSN));
$this->serializer = $serializer;
}
public function canHandle(string $aggregateClassName): bool
{
return true;
}
public function findBy(string $aggregateClassName, array $identifiers): ?object
{
$this->createSharedTableIfNeeded(); // 3
$record = $this->connection->executeQuery(<<<SQL
SELECT * FROM aggregate WHERE id = :id AND class = :class
SQL, ["id" => $this->getFirstId($identifiers), "class" => $aggregateClassName])->fetch(\PDO::FETCH_ASSOC);
if (!$record) {
return null;
}
// 4
return $this->serializer->convertToPHP($record["data"], "application/json", $aggregateClassName);
}
public function save(array $identifiers, object $aggregate, array $metadata, ?int $expectedVersion): void
{
$this->createSharedTableIfNeeded();
$aggregateClass = get_class($aggregate);
// 5
$data = $this->serializer->convertFromPHP($aggregate, "application/json");
if ($this->findBy($aggregateClass, $identifiers)) {
$this->connection->update(self::TABLE_NAME,
["data" => $data],
["id" => $this->getFirstId($identifiers), "class" => $aggregateClass]
);
return;
}
$this->connection->insert(self::TABLE_NAME, [
"id" => $this->getFirstId($identifiers),
"class" => $aggregateClass,
"data" => $data
]);
}
private function createSharedTableIfNeeded(): void
{
$hasTable = $this->connection->executeQuery(<<<SQL
SELECT name FROM sqlite_master WHERE name=:tableName
SQL, ["tableName" => self::TABLE_NAME])->fetchColumn();
if (!$hasTable) {
$this->connection->executeStatement(<<<SQL
CREATE TABLE aggregate (
id VARCHAR(255),
class VARCHAR(255),
data TEXT,
PRIMARY KEY (id, class)
)
SQL
);
}
}
/**
* @param array $identifiers
* @return mixed
*/
private function getFirstId(array $identifiers)
{
return array_values($identifiers)[0];
}
}namespace App\Infrastructure\Persistence;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Ecotone\Messaging\Attribute\Interceptor\Around;
use Ecotone\Messaging\Handler\Processor\MethodInvoker\MethodInvocation;
use Ecotone\Modelling\CommandBus;
class TransactionInterceptor
{
private Connection $connection;
public function __construct()
{
$this->connection = DriverManager::getConnection(array('url' => DbalRepository::CONNECTION_DSN));
}
#[Around(pointcut: CommandBus::class)]
public function transactional(MethodInvocation $methodInvocation)
{
echo "Start transaction\n";
$this->connection->beginTransaction();
try {
$result = $methodInvocation->proceed();
$this->connection->commit();
echo "Commit transaction\n";
}catch (\Throwable $exception) {
$this->connection->rollBack();
echo "Rollback transaction\n";
throw $exception;
}
return $result;
}
}bin/console ecotone:quickstart
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Commit transaction
110
Good job, scenario ran with success!Asynchronous PHP Workers
Ecotone provides abstractions for asynchronous execution.
We got new requirement:
User should be able to place order for different products.
We will need to build Order aggregate.
Let's start by creating PlaceOrderCommand with ordered product Ids
We will need OrderedProduct value object, which will describe, cost and identifier of ordered product
And our Order aggregate
placeOrder - Place order method make use of QueryBus to retrieve cost of each ordered product.
You could find out, that we are not using application/json for product.getCost query, ecotone/jms-converter can handle array transformation, so we do not need to use json.
We do not need to change or add new Repository, as our exiting one can handle any new aggregate arriving in our system.
Let's change our testing class and run it!
We want to be sure, that we do not lose any order, so we will register our order.place Command Handler to run asynchronously using RabbitMQ now.
Let's start by adding extension to Ecotone, that can handle RabbitMQ:
We also need to add our ConnectionFactory to our Dependency Container.
Let's add our first AMQP Backed Channel (RabbitMQ Channel), in order to do it, we need to create our first Application Context.
Application Context is a non-constructor class, responsible for extending Ecotone with extra configurations, that will help the framework act in a specific way. In here we want to tell Ecotone about AMQP Channel with specific name.
Let's create new class App\Infrastructure\MessagingConfiguration.
ServiceContext - Tell that this method returns configuration. It can return array of objects or a single object.
Now we need to tell our order.place Command Handler, that it should run asynchronously using our neworders channel.
We do it by adding Asynchronous annotation with channelName used for asynchronous endpoint.
Endpoints using Asynchronous are required to have endpointId defined, the name can be anything as long as it's not the same as routing key (order.place).
Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list
We have new asynchronous endpoint available orders. Name comes from the message channel name.
You may wonder why it is not place_order_endpoint, it's because via single asynchronous channel we can handle multiple endpoints, if needed. This is further explained in .
Let's change orderId in our testing command, so we can place new order.
After running our testing command bin/console ecotone:quickstartwe should get an exception:
That's fine, we have registered order.place Command Handler to run asynchronously, so we need to run our asynchronous endpoint in order to handle Command Message. If you did not received and exception, it's probably because orderId was not changed and we already registered such order.
Let's run our asynchronous endpoint
Like we can see, it ran our Command Handler and placed the order.
We can change our testing command to run only Query Handlerand check, if the order really exists now.
There is one thing we can change.
As in asynchronous scenario we may not have access to the context of executor to enrich the message,, we can change our AddUserIdService Interceptor to perform the action before sending it to asynchronous channel.
This Interceptor is registered as Before Interceptor which is before execution of our Command Handler, but what we want to achieve is, to call this interceptor before message will be send to the asynchronous channel. For this there is Presend Interceptor available.
Change Before annotation to Presend annotation and we are done.
Ecotone will do it best to handle serialization and deserialization of your headers.
Now if non-administrator will try to execute this, exception will be thrown, before the Message will be put to the asynchronous channel. Thanks to Presend interceptor, we can validate messages, before they will go asynchronous, to prevent sending incorrect messages.
We made it through, Congratulations! We have successfully registered asynchronous Command Handler and safely placed the order. We have finished last lesson. You may now apply the knowledge in real project or check more advanced usages starting here .
namespace App\Domain\Order;
class PlaceOrderCommand
{
private int $orderId;
/**
* @var int[]
*/
private array $productIds;
/**
* @return int[]
*/
public function getProductIds(): array
{
return $this->productIds;
}
public function getOrderId() : int
{
return $this->orderId;
}
}namespace App\Domain\Order;
class OrderedProduct
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public function getCost(): int
{
return $this->cost;
}
}namespace App\Domain\Order;
use App\Infrastructure\AddUserId\AddUserId;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\QueryBus;
#[Aggregate]
#[AddUserId]
class Order
{
#[AggregateIdentifier]
private int $orderId;
private int $buyerId;
/**
* @var OrderedProduct[]
*/
private array $orderedProducts;
private function __construct(int $orderId, int $buyerId, array $orderedProducts)
{
$this->orderId = $orderId;
$this->buyerId = $buyerId;
$this->orderedProducts = $orderedProducts;
}
#[CommandHandler("order.place")]
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}
#[QueryHandler("order.getTotalPrice")]
public function getTotalPrice() : int
{
$totalPrice = 0;
foreach ($this->orderedProducts as $orderedProduct) {
$totalPrice += $orderedProduct->getCost();
}
return $totalPrice;
}
}class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
["productId" => 1, "cost" => 100]
);
$this->commandBus->sendWithRouting(
"product.register",
["productId" => 2, "cost" => 300]
);
$orderId = 100;
$this->commandBus->sendWithRouting(
"order.place",
["orderId" => $orderId, "productIds" => [1,2]]
);
echo $this->queryBus->convertAndSend("order.getTotalPrice", MediaType::APPLICATION_X_PHP_ARRAY, ["orderId" => $orderId]);
}
}bin/console ecotone:quickstart
Running example...
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Product with id 2 was registered!
Commit transaction
Start transaction
Commit transaction
400
Good job, scenario ran with success!composer require ecotone/amqp# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# You need to have RabbitMQ instance running on your localhost, or change DSN
Enqueue\AmqpExt\AmqpConnectionFactory:
class: Enqueue\AmqpExt\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@localhost:5672//"# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# docker-compose.yml has RabbitMQ instance defined. It will be working without
# addtional configuration
Enqueue\AmqpExt\AmqpConnectionFactory:
class: Enqueue\AmqpExt\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@rabbitmq:5672//"# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");
});
}
(...)# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//");
});
}
(...)# Add AmqpConnectionFactory in bin/console.php
// add additional service in container
public function __construct()
{
$this->container = new Container();
$this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//"));
}
# Add AmqpConnectionFactory in bin/console.php
// add additional service in container
public function __construct()
{
$this->container = new Container();
$this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//"));
}namespace App\Infrastructure;
class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return [
AmqpBackedMessageChannelBuilder::create("orders")
];
}
}use Ecotone\Messaging\Annotation\Asynchronous;
(...)
#[Asynchronous("orders")]
#[CommandHandler("order.place", endpointId: "place_order_endpoint")]
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}#[CommandHandler("order.place", endpointId: "place_order_endpoint")]bin/console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+public function run() : void
{
$this->commandBus->sendWithRouting(
"product.register",
["productId" => 1, "cost" => 100]
);
$this->commandBus->sendWithRouting(
"product.register",
["productId" => 2, "cost" => 300]
);
$orderId = 990;
$this->commandBus->sendWithRouting(
"order.place",
["orderId" => $orderId, "productIds" => [1,2]]
);
echo $this->queryBus->sendWithRouting("order.getTotalPrice", ["orderId" => $orderId]);
}AggregateNotFoundException:
Aggregate App\Domain\Order\Order:getTotalPrice was not found for indentifie
rs {"orderId":990} bin/console ecotone:run orders --handledMessageLimit=1 --stopOnFailure -vvv
[info] {"orderId":990,"productIds":[1,2]}class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
$orderId = 990;
echo $this->queryBus->sendWithRouting("order.getTotalPrice", ["orderId" => $orderId]);
}
}bin/console ecotone:quickstart -vvv
Running example...
400
Good job, scenario ran with success!namespace App\Infrastructure\AddUserId;
class AddUserIdService
{
#[Presend(0, AddUserId::class, true)]
public function add() : array
{
return ["userId" => 1];
}
}PHP Messages
Key concepts / background\
Ecotone from the ground is built around messaging to provide a simple model that allows to connects components, modules or even different Applications together, in seamless and easy way. To achieve that fundamental messaging blocks are implemented using Enterprise Integration PatternsOn top of what we get support for higher level patterns like CQRS, Events, DDD - which help us build systems that make the business logic explicit and maintainable, even in the long term. In this first lesson, we will learn fundamental blocks in messaging architecture and we will start building back-end for Shopping System using CQRS. Before we will dive into implementation, let's briefly understand main concepts behind Ecotone.
A Message is a data record containing of Payload and Message Headers (Metadata). The Payload can be of any PHP type (scalar, object, compound), and the Headers hold commonly required information such as ID, timestamp and framework specific information. Developers can also store any arbitrary key-value pairs in the headers, to pass additional meta information.
Message channel abstracts communication between components. It does allow for sending and receiving messages. This decouples components from knowledge about the transport layer, as it's encapsulated within the Message Channel.
Message Endpoints are consumers and producers of messages. Consumer are not necessary asynchronous, as you may build synchronous flow, compound of multiple endpoints.
The Messaging Gateway encapsulates messaging-specific code (The code required to send or receive a ) and separates it from the rest of the application code.
It take your domain specific objects an convert them into a that is send via .
To not have dependency on the Messaging Framework Ecotone provides the Gateway as interface and generates proxy class for it.
You will not have to implement Messages, Message Channels or Message Endpoints directly, as those are lower level concepts. Instead you will be able to focus on your specific domain logic with an implementation based on plain PHP objects. By providing declarative configuration we will be able to connect domain-specific code to the messaging system.
Great, now when we know fundamental blocks of Ecotone and Messaging Architecture, we can start implementing our Shopping System!
If you did not understand something, do not worry, we will see how does it apply in practice in next step.
Do you remember this command from ?
If yes and this command does return above output, then we are ready to go.
this method will be run, whenever we executeecotone:quickstart.
This class is auto-registered using auto-wire system, both and provides this great feature. For Lite clean and easy to use is taken.\
Thanks to that, we will avoid writing configuration files for service registrations during this tutorial.
And we will be able to fully focus on what can Ecotone provides to us.
We will start by creating Command Handler. Command Handler is place where we will put our business logic. Let's create namespace App\Domain\Product and inside RegisterProductCommand, command for registering new product:
Let's register a Command Handler now by creating class App\Domain\Product\ProductService
First thing worth noticing is #[CommandHandler].
This marks our register method in ProductService as an , from that moment it can be found by Ecotone.
Ecotone will read method declaration and base on the first parameter type hint will know that this CommandHandler is responsible for handling RegisterProductCommand.
Ecotone make use to provide declarative configuration. In most of the scenarios we will be stating "what" we want to achieve with Attributes, and Ecotone will take care of "how". This way our application logic will stay decoupled from the technical concerns.
We also need the possibility to query ProductService for registered products and this is the role of Query Handlers. Let's starts with GetProductPriceQuery class. This query will tell us what is the price of specific product.
We also need Handler for this query. Let's add Query Handler to the ProductService
Some CQRS frameworks expects Handlers be defined as a class, not method. This is somehow limiting and producing a lot of boilerplate. Ecotone does allow for full flexibility, if you want to have only one handler per class, so be it, otherwise just annotate next methods.
It's time to call our Endpoints. You may remember that need to be connected using and we did not do anything like this yet. Thankfully Ecotone does create synchronous channels for us, therefore we don't need to bother about it.
Synchronous channels are created automatically for our Message Handlers. We will learn easily can they be replaced with asynchronous channels in next lessons.
We need to create and send it to correct .
Let's inject and call Query and Command bus into EcotoneQuickstart class.
Gateways are auto registered in Dependency Container and available for auto-wire.
Ecotone comes with few Gateways out of the box like Command and Query buses.
We are sending command RegisterProductCommand to the CommandHandler we registered before.
Same as above, but in that case we are sending query GetProductPriceQuery to the QueryHandler
As you can see we have not defined any Message Channels, Messages or Gateways, yet they all being used in this scenario. This is can happen because Ecotone is using high level abstractions so our daily development is focused on the business side of the code, yet under the hood is using powerful Messaging capabilities.
If you run our testing command now, you should see the result.
We want to notify, when new product is registered in the system.
In order to do it, we will make use of Event Bus Gateway which can publish events.
Let's start by creating ProductWasRegisteredEvent.
Let's inject EventBus into our CommandHandler in order to publish ProductWasRegisteredEvent after product was registered.
Ecotone does control method invocation for , if you have type hinted for specific class, framework will look in Dependency Container for specific service in order to inject it automatically.
In this scenario it injects for us Event Bus. If you want to know more, check chapter about .
Now, when our event is published, whenever new product is registered, we want to subscribe to that Event and send notification. Let's create new class and annotate method with EventHandler.
EventHandler tells Ecotone to handle specific event based on declaration type hint, just like with CommandHandler.
Commands are targeting single Handler, Events on other hand can have multiple Handlers subscribing to it.
If you run our testing command now, you should see the result.
Great, we have just finished Lesson 1. In this lesson we have learned basic of Messaging and CQRS. That was the longest lesson, as it had to introduce new concepts. Incoming lessons will be much shorter :)
We are ready for Lesson 2!
interface Message
{
public function getPayload();
public function getHeaders() : MessageHeaders;
}bin/console ecotone:quickstart
"Running example...
Hello World
Good job, scenario ran with success!"Go to "src/EcotoneQuickstart.php"
# This class is autoregistered using Symfony AutowireGo to "app/EcotoneQuickstart.php"
# This class is autoregistered using Laravel Autowirehttps://github.com/ecotoneframework/quickstart-lite/blob/lesson-1/src/EcotoneQuickstart.php
Go to "src/EcotoneQuickstart.php"
# This class is autoregistered using PHP-DI<?php
namespace App;
class EcotoneQuickstart
{
public function run() : void
{
echo "Hello World";
}
}<?php
namespace App\Domain\Product;
class RegisterProductCommand
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public function getProductId() : int
{
return $this->productId;
}
public function getCost() : int
{
return $this->cost;
}
}private int $productId;<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\CommandHandler;
class ProductService
{
private array $registeredProducts = [];
#[CommandHandler]
public function register(RegisterProductCommand $command) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
}
}#[ClassReference("productService")
class ProductService<?php
namespace App\Domain\Product;
class GetProductPriceQuery
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public function getProductId() : int
{
return $this->productId;
}
}<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
class ProductService
{
private array $registeredProducts = [];
#[CommandHandler]
public function register(RegisterProductCommand $command) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
}
#[QueryHandler]
public function getPrice(GetProductPriceQuery $query) : int
{
return $this->registeredProducts[$query->getProductId()];
}
}# As default auto wire of Laravel creates new service instance each time
# service is requested from Depedency Container, for our examples
# we want to register ProductService as singleton.
# Go to bootstrap/QuickStartProvider.php and register our ProductService
namespace Bootstrap;
use App\Domain\Product\ProductService;
use Illuminate\Support\ServiceProvider;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(ProductService::class, function(){
return new ProductService();
});
}
(...)Everything is set up by the framework, please continue...Everything is set up, please continue...<?php
namespace App;
use App\Domain\Product\GetProductPriceQuery;
use App\Domain\Product\RegisterProductCommand;
use Ecotone\Modelling\CommandBus;
use Ecotone\Modelling\QueryBus;
class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
// 1
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
// 2
$this->commandBus->send(new RegisterProductCommand(1, 100));
// 3
echo $this->queryBus->send(new GetProductPriceQuery(1));
}
}bin/console ecotone:quickstart
Running example...
100
Good job, scenario ran with success!<?php
namespace App\Domain\Product;
class ProductWasRegisteredEvent
{
private int $productId;
public function __construct(int $productId)
{
$this->productId = $productId;
}
public function getProductId() : int
{
return $this->productId;
}
}use Ecotone\Modelling\EventBus;
#[CommandHandler]
public function register(RegisterProductCommand $command, EventBus $eventBus) : void
{
$this->registeredProducts[$command->getProductId()] = $command->getCost();
$eventBus->publish(new ProductWasRegisteredEvent($command->getProductId()));
}<?php
namespace App\Domain\Product;
use Ecotone\Modelling\Attribute\EventHandler;
class ProductNotifier
{
#[EventHandler] // 1
public function notifyAbout(ProductWasRegisteredEvent $event) : void
{
echo "Product with id {$event->getProductId()} was registered!\n";
}
}bin/console ecotone:quickstart
Running example...
Product with id 1 was registered!
100
Good job, scenario ran with success!
