Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Loading...
Domain Driven Design Command Query Responsibility Segregation PHP
If you're looking on way to start and get to familiar with Ecotone. Then Ecotone provides different ways to do so:
Step by step Tutorial - Tutorial will introduce to Ecotone's fundaments and will help you build understanding of the Messaging concepts.
Demo Laravel and Symfony Application - You can test Ecotone in real life example, by using our demo application. The demo application shows how to use Ecotone with Laravel and Symfony frameworks.
Quickstart Examples - Provides great way to check specific Ecotone features. Whatever you use Laravel or Symfony or Lite (no external framework), all examples will be able to work in your Application.
Ask question to AI - Ecotone provides AI support, to help you find the answers quicker. You may ask any Ecotone related questions, and it will provide more details on the topic and links where more information can be found.
Have a Workshop or Consultancy - To quickly get whole Team or Organisation up and running with Ecotone, we provide workshops. Workshops will not only teach you how to use Ecotone, but also the concepts and reasoning behind it.
Join Community Channel - Ecotone has a community channel, where you can ask questions, discuss with other users and get help. It is also a great place to share your experiences, and to meet other developers using Ecotone.
Subscribe to Mailing list - Join mailing list to stay up to date with Ecotone changes and latest articles and features.
Some demos and quick-start examples are done using specific framework integration. However Ecotone does not bind given set of features to specific solution. Whatever you use Laravel, Symfony or Lite (no external framework), all features will work in the same way. Therefore feel encouraged to test out examples, even if they are not in framework of your choose.
The roots of Object Oriented Programming (OOP) were mainly about communication using Messages and logic encapsulation. The main idea was to focus on the flows and communication, not on the objects itself. Objects were meant to be encapsulating logic, and expose clear interfaces of what they can do, and inform what they have done.
Ecotone aims to return to the origins of OOP, by providing tools which allows us to fully move the focus from Objects to Flows, from Data storage to Application Design, from Technicalities to Business logic. Ecotone does that by making Messages first class-citizen in our Applications. In Ecotone any communication happens through Messages, each Object states clearly what Messages they expect to receive (input) and what Messages they will send out (output). This together with higher level abstractions like Message Handlers and Aggregates makes the system design explicit, easy to follow and change.
Read more in introduction section...
Check how to install Ecotone for Symfony, Laravel or Lite.
To start with Ecotone there is no need for big impact refactor. You may introduce it in your existing code base and start using it from day one, even for a single feature.
Ecotone works out of the box with popular PHP frameworks like Symfony, Laravel and can be run stand alone or with any other framework (e.g. Laminas, CodeIgniter, Magento) using Ecotone Lite.
Event Handlers PHP
Read more about Event Handling in PHP and Ecotone
Let's create Event
Order was placed.
And Event Handler that will be listening to the OrderWasPlaced
.
Installing Ecotone for Symfony, Laravel or Stand Alone
Use composer in order to download Ecotone Symfony Bundle
composer require ecotone/symfony-bundle
If you're using Symfony Flex, bundle will auto-configure. If that did not happen, register bundle in config/bundles.php
By default Ecotone will look for Attributes in default Symfony catalog "src". If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
Use composer in order to download Ecotone Laravel
composer require ecotone/laravel
Provider should be automatically registered. If that did not happen, register provider
By default Ecotone will look for Attributes in default Laravel catalog "app". If you do follow different structure, you can use "namespaces" configuration to tell Ecotone, where to look for.
If you're using no framework or framework different than Symfony or Laravel, then you may use Ecotone Lite to bootstrap Ecotone.
composer require ecotone/ecotone
If you already have Dependency Container configured, then:
By default Ecotone will look for Attributes only in Classes provided under "classesToResolve". If we want to look for Attributes in given set of Namespaces, we can pass it to the configuration.
You may actually run Ecotone without any Dependency Container. That may be useful for small applications, testing or when we want to run some small Ecotone's script.
You may use out of the box Ecotone Lite Application, which provide you with Dependency Container.
composer require ecotone/lite-application
Command Query Responsibility Segregation PHP
Read Blog Post about CQRS in PHP and Ecotone
Let's create PlaceOrder
Command
that will place an order in our system.
And Command Handler
that will handle this Command
Let's define GetOrder
Query
that will find our placed order.
And Query Handler
that will handle this query
Microservices, Message-Driven, Event-Driven Architecture in PHP
Outbox pattern implementation in PHP
Building Reactive Message-Driven Systems in PHP
Running the code asynchronously
Read more about Asynchronous in PHP and Ecotone
Building Reactive Message-Driven Systems in PHP
Let's create Event
Order was placed.
And Event Handler that will be listening to the OrderWasPlaced
.
Let's Ecotone
that we want to run this Event Handler Asynchronously using RabbitMQ
PHP Messages
Ecotone from the ground is built around messaging to provide a simple model that allows to connects components, modules or even different Applications together, in seamless and easy way. To achieve that fundamental messaging blocks are implemented using Enterprise Integration PatternsOn top of what we get support for higher level patterns like CQRS, Events, DDD - which help us build systems that make the business logic explicit and maintainable, even in the long term. In this first lesson, we will learn fundamental blocks in messaging architecture and we will start building back-end for Shopping System using CQRS. Before we will dive into implementation, let's briefly understand main concepts behind Ecotone.
A Message is a data record containing of Payload and Message Headers (Metadata). The Payload can be of any PHP type (scalar, object, compound), and the Headers hold commonly required information such as ID, timestamp and framework specific information. Developers can also store any arbitrary key-value pairs in the headers, to pass additional meta information.
Message channel abstracts communication between components. It does allow for sending and receiving messages. This decouples components from knowledge about the transport layer, as it's encapsulated within the Message Channel.
Message Endpoints are consumers and producers of messages. Consumer are not necessary asynchronous, as you may build synchronous flow, compound of multiple endpoints.
The Messaging Gateway encapsulates messaging-specific code (The code required to send or receive a Message) and separates it from the rest of the application code.
It take your domain specific objects an convert them into a Message that is send via Message channel.
To not have dependency on the Messaging Framework Ecotone
provides the Gateway as interface and generates proxy class for it.
You will not have to implement Messages, Message Channels or Message Endpoints directly, as those are lower level concepts. Instead you will be able to focus on your specific domain logic with an implementation based on plain PHP objects. By providing declarative configuration we will be able to connect domain-specific code to the messaging system.
Great, now when we know fundamental blocks of Ecotone
and Messaging Architecture, we can start implementing our Shopping System!
If you did not understand something, do not worry, we will see how does it apply in practice in next step.
Do you remember this command from Setup part?
If yes and this command does return above output, then we are ready to go.
this method will be run, whenever we executeecotone:quickstart
.
This class is auto-registered using auto-wire system, both Symfony and Laravel provides this great feature. For Lite
clean and easy to use PHP-DI
is taken.
Thanks to that, we will avoid writing configuration files for service registrations during this tutorial.
And we will be able to fully focus on what can Ecotone
provides to us.
We will start by creating Command Handler. Command Handler is place where we will put our business logic. Let's create namespace App\Domain\Product and inside RegisterProductCommand, command for registering new product:
Let's register a Command Handler now by creating class App\Domain\Product\ProductService
First thing worth noticing is #[CommandHandler].
This attribute marks our register
method in ProductService as an Endpoint, from that moment it can be found by Ecotone.
Ecotone will read method declaration and base on the first parameter type hint will know that this CommandHandler is responsible for handling RegisterProductCommand.
Ecotone make use Attributes to provide declarative configuration. In most of the scenarios we will be stating "what" we want to achieve with Attributes, and Ecotone will take care of "how". This way our application logic will stay decoupled from the technical concerns.
We also need the possibility to query ProductService for registered products and this is the role of Query Handlers. Let's starts with GetProductPriceQuery class. This query will tell us what is the price of specific product.
We also need Handler for this query. Let's add Query Handler
to the ProductService
Some CQRS frameworks expects Handlers be defined as a class, not method. This is somehow limiting and producing a lot of boilerplate. Ecotone
does allow for full flexibility, if you want to have only one handler per class, so be it, otherwise just annotate next methods.
It's time to call our Endpoints. You may remember that endpoints need to be connected using Message Channels and we did not do anything like this yet. Thankfully Ecotone does create synchronous channels for us, therefore we don't need to bother about it.
Synchronous channels are created automatically for our Message Handlers. We will learn easily can they be replaced with asynchronous channels in next lessons.
We need to create Message
and send it to correct Message Channel
.
Let's inject and call Query and Command bus into EcotoneQuickstart class.
Gateways are auto registered in Dependency Container and available for auto-wire.
Ecotone
comes with few Gateways out of the box like Command and Query buses.
We are sending command RegisterProductCommand to the CommandHandler we registered before.
Same as above, but in that case we are sending query GetProductPriceQuery to the QueryHandler
As you can see we have not defined any Message Channels, Messages or Gateways, yet they all being used in this scenario. This is can happen because Ecotone is using high level abstractions so our daily development is focused on the business side of the code, yet under the hood is using powerful Messaging capabilities.
If you run our testing command now, you should see the result.
We want to notify, when new product is registered in the system.
In order to do it, we will make use of Event Bus Gateway which can publish events.
Let's start by creating ProductWasRegisteredEvent.
Let's inject EventBus into our CommandHandler in order to publish ProductWasRegisteredEvent after product was registered.
Ecotone
does control method invocation for endpoints, if you have type hinted for specific class, framework will look in Dependency Container for specific service in order to inject it automatically.
In this scenario it injects for us Event Bus. If you want to know more, check chapter about Method Invocation.
Now, when our event is published, whenever new product is registered, we want to subscribe to that Event and send notification. Let's create new class and annotate method with EventHandler.
EventHandler tells Ecotone to handle specific event based on declaration type hint, just like with CommandHandler.
Commands are targeting single Handler, Events
on other hand can have multiple Handlers subscribing to it.
If you run our testing command now, you should see the result.
Great, we have just finished Lesson 1. In this lesson we have learned basic of Messaging and CQRS. That was the longest lesson, as it had to introduce new concepts. Incoming lessons will be much shorter :)
We are ready for Lesson 2!
DDD PHP
An Aggregate is an entity or group of entities that is always kept in a consistent state. Aggregates are very explicitly present in the Command Model, as that is where change is initiated and business behaviour is placed.
Let's create our first Aggregate Product.
Aggregate attribute marks class to be known as Aggregate
Identifier marks properties as identifiers of specific Aggregate instance. Each Aggregate must contains at least one identifier.
QueryHandler enables query handling on specific method just as we did in Lesson 1.
Now remove App\Domain\Product\ProductService
as it contains handlers for the same command and query classes.
Before we will run our test scenario, we need to register Repository
.
Repositories are used for retrieving and saving the aggregate to persistent storage. We will build an in-memory implementation for now.
Repository attribute marks class to be known to Ecotone
as Repository.
We need to implement some methods in order to allow Ecotone
to retrieve and save Aggregate. Based on implemented interface, Ecotone
knowns, if Aggregate is state-stored or event sourced.
canHandle tells which classes can be handled by this specific repository.
findBy return found aggregate instance or null. As there may be more, than single indentifier per aggregate, identifiers are array.
save saves an aggregate instance. You do not need to bother right what is $metadata
and $expectedVersion
.
Let's run our testing command:
Have you noticed what we are missing here? Our Event Handler
was not called, as we do not publish the ProductWasRegistered
event anymore.
In order to automatically publish events recorded within Aggregate, we need to add method annotated with AggregateEvents.
This will tell Ecotone
where to get the events from.
Ecotone
comes with default implementation, that can be used as trait WithEvents.
Let's run our testing command:
Congratulations, we have just finished Lesson 2. In this lesson we have learnt how to make use of Aggregates and Repositories. Now we will learn about Converters and Metadata
CommandHandler enables command handling on specific method just as we did in . If method is static, it's treated as a and must return a new aggregate instance. Rule applies as long as we use instead of .
If you want to known more details about Aggregate start with chapter
If you want to known more details about Repository start with chapter
Ecotone PHP Framework
The best way to get started with Ecotone is to actually build something realistic. Therefore we will build a small back-end for Shopping System during this tutorial. The techniques we will learn in the tutorial are fundamental to building any application using Ecotone.
Found something to improve in the docs? Create Pull Request in Documentation repository.
The tutorial is divided into several lessons:
Lesson 1, we will learn the fundamentals of Ecotone: Endpoints, Messages, Channels, and Command Query Responsibility Segregation (CQRS)
Lesson 2, we will learn Tactical Domain Driven Design (DDD): Aggregates, Repositories and also Event Handlers
Lesson 3, we will learn how to use Converters, therefore how to handle serialization and deserialization
Lesson 4, we will learn about Metadata and Method Invocation - How we can execute Message Handlers in a way not available in any other PHP Framework
Lesson 5, we will learn about Interceptors, Ecotone's powerful Middlewares
Lesson 6, we we will learn about Asynchronous Endpoints, so how to process our Messages asynchronously.
You don’t have to complete all of the lessons at once to get the value out of this tutorial. You will start benefit from the tutorial even if it’s one or two lessons.
PHP Metadata and Method Invocation
Message can contain of Metadata. Metadata is just additional information stored along side to the Message's payload. It may contain things like currentUser, timestamp, contentType, messageId.
To test out Metadata, let's assume we just got new requirement for our Products in Shopping System.:
User who registered the product, should be able to change it's price.
Let's start by adding ChangePriceCommand
We will handle this Command in a minute. Let's first add user information for registering the product. We will do it, using Metadata. Let's get back to our Testing Class EcotoneQuickstart and add 4th argument to our CommandBus call.
sendWithRouting accepts 4th argument, which is associative array. Whatever we will place in here, will be available during message handling for us - This actually our Metadata. It's super simple to pass new Headers, it's matter of adding another key to the array. Now we can change our Product aggregate:
We have added second parameter $metadata to our CommandHandler. Ecotone read parameters and evaluate what should be injected. We will see soon, how can we take control of this process. We can add changePrice method now to our Aggregate:
And let's call it with incorrect userId and see, if we get the exception.
Let's run our testing command:
We have been just informed, that customers are registering new products in our system, which should not be a case. Therefore our next requirement is:
Only administrator should be allowed to register new Product
Let's create simple UserService which will tell us, if specific user is administrator.
In our testing scenario we will suppose, that only user with id
of 1 is administrator.
Now we need to think where we should call our UserService. The good place for it, would not allow for any invocation of product.register command without being administrator, otherwise our constraint may be bypassed. Ecotone does allow for auto-wire like injection for endpoints. All services registered in Depedency Container are available.
Great, there is no way to bypass the constraint now. The isAdmin constraint must be satisfied in order to register new product. Let's correct our testing class.
Let's run our testing command:
Ecotone inject arguments based on Parameter Converters. Parameter converters , tells Ecotone how to resolve specific parameter and what kind of argument it is expecting. The one used for injecting services like UserService is Reference parameter converter. Let's see how could we use it in our product.register command handler.
Let's suppose UserService is registered under user-service in Dependency Container. Then we would need to set up the CommandHandler
like below.
Reference
- Does inject service from Dependency Container. If referenceName,
which is name of the service in the container is not given, then it will take the class name as default.
Payload
- Does inject payload of the message. In our case it will be the command itself
Headers
- Does inject all headers as array.
Header
- Does inject single header from the message.
There is more to be said about this, but at this very moment, it will be enough for us to know that such possibility exists in order to continue.
You may read more detailed description in Method Invocation section.
Ecotone, if parameter converters are not passed provides default converters.
First parameter is always Payload.
The second parameter, if is array then Headers converter is taken, otherwise if class type hint is provided for parameter, then Reference converter is picked.
If we would want to manually configure parameters for product.register Command Handler, then it would look like this:
We could also inject specific header and let Ecotone convert it directly to specific object (if we have Converter registered):
Great, we have just finished Lesson 4!
In this Lesson we learned about using Metadata to provide extra information to our Message. Besides we took a look on how arguments are injected into endpoint and how we can make use of it. Now we will learn about powerful Interceptors, which can be describes as Middlewares on steroids.
PHP Conversion
Command, queries and events are not always objects. When they travel via different asynchronous channels, they are converted to simplified format, like JSON or XML. At the level of application however we want to deal with PHP format as objects or arrays.
Moving from one format to another requires conversion. Ecotone does provide extension points in which we can integrate different Media Type Converters to do this type of conversion.
Let's build our first converter from JSON to our PHP format. In order to do that, we will need to implement Converter
interface and mark it with MediaTypeConverter().
TypeDescriptor - Describes type in PHP format. This can be class, scalar (int, string), array etc.
MediaType - Describes Media type format. This can be application/json, application/xml etc.
$source - is the actual data to be converted.
Let's start with implementing matches method. Which tells us, if this converter can do conversion from one type to another.
This will tell Ecotone that in case source media type is JSON and target media type is PHP, then it should use this converter. Now we can implement the convert method. We will do pretty naive solution, just for the proof the concept.
And let's add fromArray method to RegisterProductCommand and GetProductPriceQuery.
Let's run our testing command:
If we call our testing command now, everything is going fine, but we still send PHP objects instead of JSON, therefore there was not need for Conversion. In order to start sending Commands and Queries in different format, we need to provide our handlers with routing key. This is because we do not deal with Object anymore, therefore we can't do the routing based on them.
Let's change our Testing class, so we call buses with JSON format.
We make use of different method now sendWithRouting.
It takes as first argument routing key to which we want to send the message.
The second argument describes the format
of message we send.
Third is the data to send itself, in this case command formatted as JSON.
Let's run our testing command:
Normally we don't want to deal with serialization and deserialization, or we want to make the need for configuration minimal. This is because those are actually time consuming tasks, which are more often than not a copy/paste code, which we need to maintain.
Ecotone comes with integration with JMS Serializer to solve this problem. It introduces a way to write to reuse Converters and write them only, when that's actually needed. Therefore let's replace our own written Converter with JMS one. Let's download the Converter using Composer.
composer require ecotone/jms-converter
Let's remove __construct and fromArray methods from RegisterProductCommand GetProductPriceQuery, and the JsonToPHPConverter class completely, as we won't need it anymore.
JMS creates cache to speed up serialization process. In case of problems with running this test command, try to remove your cache. Let's run our testing command:
Do you wonder, how come, that we just deserialized our Command and Query classes without any additional code? JMS Module reads properties and deserializes according to type hint or docblock for arrays. It's pretty straight forward and logical:
Let's imagine we found out, that we have bug in our software. Our system users have registered product with negative price, which in result lowered the bill.
Product should be registered only with positive cost
We could put constraint in Product, validating the Cost amount. But this would assure us only in that place, that this constraint is met. Instead we want to be sure, that the Cost is correct, whenever we make use of it, so we can avoid potential future bugs. This way we will know, that whenever we will deal with Cost object, we will now it's correct. To achieve that we will create Value Object named Cost that will handle the validation, during the construction.
Great, but where to convert the integer to the Cost class? We really don't want to burden our business logic with conversions. Ecotone JMS does provide extension points, so we can tell him, how to convert specific classes.
Let's create class App\Infrastructure\Converter\CostConverter.
We will put it in different namespace, to separate it from the domain.
We mark the methods with Converter attribute, so Ecotone can read parameter type and return type in order to know, how he can convert from scalar/array to specific class and vice versa. Let's change our command and aggregate class, so it can use the Cost directly.
The $cost class property will be automatically converted from integer to Cost by JMS Module.
Let's run our testing command:
In this Lesson we learned how to make use of Converters. The command which we send from outside (to the Command Bus) is still the same, as before. We changed the internals of the domain, without affecting consumers of our API. In next Lesson we will learn and Method Invocation and Metadata
Great, we just finished Lesson 3!
PHP Message Bus, CQRS, Command Event Query Handlers
In this chapter we will cover process of handling and dispatching Messages with Ecotone. We will discuss topics like Commands, Events and Queries, Message Handlers, Message Buses, Aggregates and Sagas. You may be interested in theory - DDD and CQRS chapter first.
Going into CQRS with PHP [Article]
Event Handling in PHP [Article]
Ecotone comes with two plans:
Ecotone Free comes with Apache License Version 2.0 for open features. It allows to build message-driven system in PHP, which solves resiliency and scalability at the architecture level. This covers all the features, which are not marked as Enterprise.
Ecotone Enterprise is based Enterprise licence. It does provides more advanced set of features aim for Enterprise usage. It does bring to the table custom features, additional integrations, and ability to optimization resource usages.
Each Enterprise feature is marked with hint on the documentation page. Enterprise features can only be run with licence key. Subscription plan is now available. To subscribe to Enterprise visit https://ecotone.tech/pricing
Dynamic Message Channels - Provides ability to simplify deployment strategy, adjusting asynchronous processing to business scenarios, and configure processing per Client dynamically (which is especially useful in Multi-Tenant and SAAS environments).
Kafka Support - Enables integration with Kafka (Event Streaming Platform) to send, receive from Messages from topics, and to use Kafka in form of Message Channel abstraction for seamless integration into the System.
Event Sourcing Handlers with Metadata - Provides ability to pass Metadata to Aggregate's Event Sourcing Handlers. This can be used to to adjust Aggregate's reconstruction process, based on Metadata information stored in related Events.
Asynchronous Message Buses - This grants ability to build customized Command/Event Buses where Message will first go over given Asynchronous Channel. This can be used to build for example Outbox Command Bus.
Distributed Bus with Service Map - Provides way to communicate between Services (Applications) with ease and in explicit and decoupled way. Make it possible to use all available Message Channels providers (RabbitMQ, Amazon SQS, Redis, Dbal, Kafka, Symfony Message Channels, Laravel Queues).
Details about more features coming soon...
Depending on the preferences, we may choose tutorial version for
Use git to download starting point in order to start tutorial
2. Run command line application to verify if everything is ready.
There are two options in which we run the tutorial:
Local Environment
Docker (preferred)
If you can see "Hello World", then we are ready to go. Time for Lesson 1!
Message Driven System with Domain Driven Design principles in PHP
The foundation idea
The roots of Object Oriented Programming were mainly about communication using Messages and logic encapsulation. The aim was to focus on the flows and communication, not on the objects itself. Objects were meant to be encapsulating logic, and expose clear interfaces of what they do, and what have they done.
If you know things like Events, Commands and Aggregates, then what was written above should feel familiar to you. This is because those concepts are build around same principles of old OOP where communication is done through Messages and Objects are meant to encapsulate logic. And Ecotone is about returning to those roots of Object Oriented Programming. It's about explicit System design where communication happen through Messages, in a way that is clear to follow and understand.
There is no possibility to immerse fully into Message based communication, as long as the foundation is not fully Message Driven. This means that each communication within the Application (not only between Applications) has to happen through Messages. This way it can become natural practice of how the system is being designed.
As Ecotone follows Enterprise Integration Patterns, it makes the communication between Objects happening through Message Channels. We can think of Message Channel as pipe, where one side send Messages into, and the other consumes from it. As communication goes through Message Channels, it becomes really easy to switch the pipes. This basically means we can easily switch Message Channel implementations to use synchronous or asynchronous Channel, different Message Brokers, and yet our Objects will not be affected by that anyhow.
Ecotone provides different levels of abstractions, which we can choose to use from. Each abstraction is described in more details in related sections. In this Introduction section we will go over high level on how things can be used, to show what is Message based communication about.
Let's discuss our example from the above screenshot, where we want to register User and trigger Notification Sender. In Ecotone flows, we would introduce Command Handler being responsible for user registration:
As you can see, we also inject Event Bus which will publish our Event Message of User Was Registered.
Command and Events are the sole of higher level Messaging. On the low level everything is Message, yet each Message can either be understood as Command (intention to do), or Event (fact that happened). This make the clear distinction between - what we want to happen vs what actually had happened.
In our Controller we would inject Command Bus to send the Command Message:
After sending Command Message, our Command Handler will be executed. Command and Event Bus are available in our Dependency Container after Ecotone installation out of the box.
What is important here is that, Ecotone never forces us to implement or extend Framework specific classes. This means that our Command or Event Messages are POPO (clean PHP objects). In most of the scenarios we will simply mark given method with Attribute and Ecotone will glue the things for us.
We mentioned Notification Sender to be executed when User Was Registered Event happens. For this we follow same convention of using Attributes:
This Event Handler will be automatically triggered when related Event will be published. This way we can easily build decoupled flows, which hook in into existing business events.
Even so Commands and Events are Messages at the fundamental level, Ecotone distinguish them because they carry different semantics. By design Commands can only have single related Command Handler, yet Events can have multiple subscribing Event Handlers. This makes it easy for Developers to reason about the system and making it much easier to follow, as the difference between Messages is built in into the architecture itself.
From here we could decide to make use Message routing functionality to decouple Controllers from constructing Command Messages.
with this in mind, we can now user CommandBus with routing and even let Ecotone deserialize the Command, so our Controller does not even need to be aware of transformations:
When controllers simply pass through incoming data to Command Bus via routing, there is not much logic left in controllers. We could even have single controller, if we would be able to get routing key. It's really up to us, what work best in context of our system.
What we could decide to do is to add so called Interceptors (middlewares) to our Command Bus to add additional data or perform validation or access checks.
Pointcut provides the point which this interceptor should trigger on. In above scenario it will trigger when Command Bus is triggered before Message is send to given Command Handler. The reference attribute stays that given parameter is Service from Dependency Container and Ecotone should inject it.
There are multiple different interceptors that can hook at different moments of the flow. We could hook before Message is sent to Asynchronous Channel, or before executing Message Handler. We could also state that we want to hook for all Command Handlers or Event Handlers. And in each step we can decide what we want to do, like modify the messages, stop the flow, enforce security checks.
When we send Command using Command Bus, Ecotone under the hood construct a Message. Message contains of two things - payload and metadata. Payload is our Command and metadata is any additional information we would like to carry.
Metadata can be easily then accessed from our Command Handler or Interceptors
Besides metadata that we do provide, Ecotone provides additional metadata that we can use whenever needed, like Message Id, Correlation Id, Timestamp etc.
Ecotone take care of automatic Metadata propagation, no matter if execution synchronous or asynchronous. Therefore we can easily access any given metadata in targeted Message Handler, and also in any sub-flows like Event Handlers. This make it really easy to carry any additional information, which can not only be used in first executed Message Handler, but also in any flow triggered as a result of that.
As we mentioned at the beginning of this introduction, communication happen through Message Channels, and thanks to that it's really easy to switch code from synchronous to asynchronous execution. For that we would simply state that given Message Handler should be executed asynchronously:
Now before this Event Handler will be executed, it will land in Asynchronous Message Channel named "async" first, and from there it will be consumed asynchronously by Message Consumer (Worker process).
If we are using Eloquent, Doctrine ORM, or Models with custom storage implementation, we could push our implementation even further and send the Command directly to our Model.
We are marking our model as Aggregate, this is concept from Domain Driven Design, which describe a model that encapsulates the business logic.
Ecotone will take care of loading the Aggregate and storing them after the method is called. Therefore all we need to do it to send an Command.
Like you can see, we also added "block()" method, which will block given user. Yet it does not hold any Command as parameter. In this scenario we don't even need Command Message, because the logic is encapsulated inside nicely, and passing a status from outside could actually allow for bugs (e.g. passing UserStatus::active). Therefore all we want to know is that there is intention to block the user, the rest happens within the method.
To execute our block method we would call Command Bus this way:
There is one special metadata here, which is "aggregate.id", this tell Ecotone the instance of User which it should fetch from storage and execute this method on. There is no need to create Command Class at all, because there is no data we need to pass there. This way we can build features with ease and protect internal state of our Models, so they are not modified in incorrect way.
One of the powers that Message Driven Architecture brings is ability to build most sophisticated workflows with ease. This is possible thanks, because each Message Handler is considered as Endpoint being able to connect to input and output Channels. This is often referenced as pipe and filters architecture, but in general this is characteristic of true message-driven systems.
Let's suppose that our registered user can apply for credit card, and for this we need to pass his application through series of steps to verify if it's safe to issue credit card for him:
We are using outputChannelName here to indicate where to pass Message after it's handled by our Command Handler. In here we could enrich our CardApplication with some additional data, or create new object. However it's fully fine to pass same object to next step, if there was no need to modify it.
Ecotone provides ability to pass same object between workflow steps. This simplify the flow a lot, as we are not in need to create custom objects just for the framework needs, therefore we stick what is actually needed from business perspective.
Let's define now location where our Message will land after:
We are using here InternalHandler, internal handlers are not connected to any Command or Event Buses, therefore we can use them as part of the workflow steps, which we don't want to expose outside.
It's really up to us whatever we want to define Message Handlers in separate classes or not. In general due to declarative configuration in form of Attributes, we could define the whole flow within single class, e.g. "CardApplicationProcess". Workflows can also be started from Command or Event Handlers, and also directly through Business Interfaces. This makes it easy to build and connect different flows, and even reuse steps when needed.
Our Internal Handler contains of inputChannelName which points to the same channel as our Command Handlers outputChannelName. This way we bind Message Handlers together to create workflows. As you can see we also added Asynchronous attribute, as process of identity verification can take a bit of time, we would like it to happen in background.
Let's define our last step in Workflow:
This we've made synchronous which is the default if no Asynchronous attribute is defined. Therefore it will be called directly after Identity verification.
Workflows in Ecotone are fully under our control defined in PHP. There is no need to use 3rd party, or to define the flows within XMLs or YAMLs. This makes it really maintainable solution, which we can change, modify and test them easily, as we are fully on the ownership of the process from within the code. It's worth to mention that workflows are in general stateless as they pass Messages from one pipe to another. However if we would want to introduce statefull Workflow we could do that using Ecotone's Sagas.
Ecotone handles failures at the architecture level to make Application clear of those concerns. As Messages are the main component of communication between Applications, Modules or even Classes in Ecotone, it creates space for recoverability in all parts of the Application. As Messages can be retried instantly or with delay without blocking other processes from continuing their work.
As Message are basically data records which carry the intention, it opens possibility to store that "intention", in case unrecoverable failure happen. This means that when there is no point in delayed retries, because we encountered unrecoverable error, then we can move that Message into persistent store. This way we don't lose the information, and when the bug is fixed, we can simply retry that Message to resume the flow from the place it failed.
There are of course more resiliency patterns, that are part of Ecotone, like:
Automatic retries to send Messages to Asynchronous Message Channels
Reconnection of Message Consumers (Workers) if they lose the connection to the Broker
Inbuilt functionalities like Message Outbox, Error Channels with Dead Letter, Deduplication of Messages to avoid double processing,
and many many more.
The flow that Ecotone based on the Messages makes the Application possibile to handle failures at the architecture level. By communicating via Messages we are opening for the way, which allows us to self-heal our application without the need for us intervene, and in case of unrecoverable failures to make system robust enough to not lose any information and quickly recover from the point o failure when the bug is fixed.
Ecotone shifts the focus from technical details to the actual business processes, using Resilient Messaging as the foundation on which everything else is built. It provides seamless communication using Messages between Applications, Modules or even different Classes.
Together with that we will be using Declarative Configuration with attributes to avoid writing and maintaining configuration files. We will be stating intention of what we want to achieve instead wiring things ourselves, as a result we will regain huge amount of time, which can be invested in more important part of the System. And together with that, we will be able to use higher level Build Blocks like Command, Event Handlers, Aggregates, Sagas which connects to the messaging seamlessly, and helps encapsulate our business logic. So all the above serves as pillars for creating so called Business Oriented Architecture:
Resilient Messaging - At the heart of Ecotone lies a resilient messaging system that enables loose coupling, fault tolerance, and self-healing capabilities.
Declarative Configuration - Introduces declarative programming with Attributes. It simplifies development, reduces boilerplate code, and promotes code readability. It empowers developers to express their intent clearly, resulting in more maintainable and expressive codebases.
Building Blocks - Building blocks like Message Handlers, Aggregates, Sagas, facilitate the implementation of the business logic. By making it possible to bind Building Blocks with Resilient Messaging, Ecotone makes it easy to build and connect even the most complex business workflows.
Ecotone blog provides articles which describes Ecotone's architecture and related features in more details. Therefore if you want to find out more, follow bellow links:
Query CQRS PHP
External Query Handlers
are Services available in your dependency container, which are defined to handle Queries
.
Queries are Plain Old PHP Objects:
To send an Query we will be using send
method on QueryBus
.
Query will be delivered to corresponding Query Handler.
Just like with Commands, we may use routing in order to execute queries:
To send an Query we will be using sendWithRouting
method on QueryBus
.
Query will be delivered to corresponding Query Handler.
Commands CQRS PHP
In this section we will discuss using Commands, Events and Queries.
We will start with Commands and Command Handlers. Even so we will be discussing Commands the functionality which we will tackle, applies to Queries
and Events
also.
Understanding this part will give us understanding of the whole.
External Command Handlers
are Services available in Dependency Container, which are defined to handle Commands
. We call them External to differentiate from Aggregate Command Handlers, which will be described in later part of the section.
In Ecotone we enable Command Handlers using attributes. By marking given method with #[CommandHandler]
we state it should be used as a Command Handler
.
In case of Ecotone the class itself is not a Command Handler, it's a method that is considered to be Command Handler. This way we may join multiple Command Handlers under same class without introducing new classes if that's not needed.
Command Handlers
are methods where we would typically places our business logic.
In above example using #[CommandHandler]
we stated that createTicket
method will be handling CreateTicketCommand
.
The first parameter of Command Handler method is indicator of the Command Class we want to handle, so in this case it will be CreateTicketCommand
.
Now whenever we will send this CreateTicketCommand
using Command Bus
, it will be delivered to createTicket
method.
We send Command using Command Bus.
After Ecotone is installed all Buses are available out of the box in Dependency Container, this way we may start using them directly after installation.
In order to use the Command we first need to define it:
All Messages (Command/Queries/Events) just like Message Handlers (Command/Query/Event Handlers) are simple Plain old PHP Objects, which means they do not extend or implement any framework specific classes. This way we keep our business code clean and easy to understand.
To send an Command we will be using send
method on CommandBus
.
Command will be delivered to corresponding Command Handler.
We may send Command with Metadata
(Message Headers) via Command Bus.
This way we may provide additional information that should not be part of the Command or details which will be reused across Command Handlers
without copy/pasting it to each of the related Command
classes.
#[Header]
provides information that we would like to fetch metadata which is under executorId
. This way Ecotone knows what to pass to the Command Handler.
If we need additional Services (which are available in Dependency Container) to perform our business logic, we may pass them to the Command Handler using #[Reference]
attribute:
In case Service is defined under custom id in DI, we may pass the reference name to the attribute:
There may be cases where creating Command classes is unnecessary boilerplate, in those situations, we may simplify the code and make use scalars
, arrays
or non-command classes
directly.
Ecotone provides flexibility which allows to create Command classes when there are actually needed. In other cases we may use routing functionality together with simple types in order to fulfill our business logic.
It happens that after performing action, we would like to return some value. This may happen for scenarios that require immediate response, for taking an payment may generate redirect URL for the end user.
The returned data will be available as result of the Command Bus.
Take under consideration that returning works for synchronous Command Handlers. in case of asynchronous scenarios this will not be possible.
PHP Middlewares Interceptors
After one of our administrators went for holiday, the others found out, they can't change cost of the product and this become really problematic for them.
Administrators should be able to change the cost of a product
We could copy paste the logic from product.register
to product.changePrice
but we want to avoid code duplication, especially logic that may happen more often. Let's intercept our Command Handlers.
Let's start by creating Annotation
called RequireAdministrator
in new namepace App\Infrastructure\RequireAdministrator
Let's create our first Before Interceptor.
Start by removing old UserService
and create new one in different namespace App\Infrastructure\RequireAdministrator
. Remember to mark return type as void
, we will see why it is so important soon.
Before
- marks method as Interceptor
, so it can be be found by Ecotone.
Pointcut
- describes what should be intercepted.
CLASS_NAME
- indicates what should be intercepted using specific Class Name
or Attribute Name
annotated at the level of method or class
expression||expression
- Indicating one expression or another e.g. Product\*||Order\*
Now we need to annotate our Command Handlers:
We told Before Interceptor
that it should intercept all endpoints with annotation RequireAdministrator.
Now, whenever we will call our command handlers, they will be intercepted by UserService
.
You can try it out, by providing different userId
.
Let's change our testing class to remove metadata and add the Interceptor
.
changeHeaders
- Tells Ecotone
if this Interceptor modifies payload
or headers
. The default is payload
.
If changeHeaders=true
thenheaders
are picked and associative array must be returned. The returned value is merged with current headers.
If changeHeaders=false
then payload
is picked and current payload is replaced by returned value, the headers stays the same.
You may of course inject current payload and headers into the method if needed, as with usual endpoint.
precedence
- Tells Ecotone
in what order interceptors should be called. The lower the value is the quicker interceptor will be called. The order exists within interceptor type: before/around/after.
We want to call AddUserId Interceptor
before RequireAdministrator Interceptor
as it require userId
to exists, in order to verify.
AddUserIdService
has precedence of 0
as default, so UserService
must have at least 1
.
Let's annotate Product
aggregate
If we annotate aggregate on the class level. Then it does work like each of the method would be annotated with specific annotation in this case @AddUserId.
Let's run our testing command:
If during Before
or Around
you decide to break the flow, return null
. Null
indiciates, that there is no message and the current flow ends.
Null can not be returned in header changing interceptor, it does work only for payload changing interceptor.
The Around Interceptor
is closet to actual endpoint's method call. Thanks to that, it has access to Method Invocation.
This does allow for starting some procedure and ending after the invocation is done.
Connection
to sqlite database using dbal library
This does create database table, if needed. It does create simple table structure containing id
of the aggregate, the class
type and serialized data
in JSON
. Take a look at createSharedTableIfNeeded
if you want more details.
Deserialize aggregate to PHP
Serialize aggregate to JSON
We want to intercept Command Bus Gateway
with transaction. So whenever we call it, it will invoke our Command Handler within transaction.
pointcut="Ecotone\Modelling\CommandBus"
This pointcut will intercept CommandBus.
Let's run our testing command:
We do have two transactions started, because we call the Command Bus twice.
Great, we have just finished Lesson 5!
Interceptors are very powerful concept. Without extending any classes or interfaces from Ecotone
, we can build build up Authorization, Transactions, Delegate duplicated logic, Call some external service, Logging and Tracing before invoking endpoint, the amount of possibilities is endless.
In the next chapter, we will learn about scheduling and polling endpoints
Ecotone follows on this making the Messaging the core of the Framework. It introduce Message based communication build around as the underlying foundation. This way even communication between PHP Objects can be done through Messages in seamless way.
There maybe situations where multiple Asynchronous Event Handlers will be subscribing to same Event. We can easily imagine that one of them may fail and things like retries become problematic (As they may trigger successful Event Handlers for the second time). That's why Ecotone introduces , which deliver a copy of the Message to each related Event Handler separately. As a result each Asynchronous Event Handler is handling it's own Message in full isolation, and in case of failure only that Handler will be retried.
Having this foundation knowledge and understanding how Ecotone works on the high level, it's good moment to dive into , which will provide hands on experience to deeper understanding.
Be sure to read before diving in this chapter.
If you have registered for specific Media Type, then you can tell Ecotone
to convert result of your Query Bus
to specific format.
In order to do this, we need to make use of Metadata
and replyContentType
header.
If we use Command Handler, Ecotone will ensure our metadata will be serialized and deserialized correctly.
In Ecotone we may register Command Handlers under routing instead of a class name.
This is especially useful if we will register to tell Ecotone how to deserialize given Command. This way we may simplify higher level code like Controllers
or Console Line Commands
by avoid transformation logic.
Ecotone is using message routing for . This way applications can stay decoupled from each other, as there is no need to share the classes between them.
Ecotone
provide us with possibility to handle via Interceptors
.
Interceptor
as name suggest, intercepts the process of handling the message.
You may enrich the , stop or modify usual processing cycle, call some shared functionality, add additional behavior to existing code without modifying the code itself.
If you are familiar with or Middleware pattern you may find some similarities.
NAMESPACE*
- Indicating all starting with namespace e.g. App\Domain\Product\*
Before
and After
interceptors are depending on the return type, to decide if they should modify or pass it through.
If return type is different than void, Message payload or headers can be enriched with data.
If return type is void then message will be passed through and the process of message flow can be interrupted by throwing exception only.
Instead of providing the userId
during calling the CommandBus
we will enrich with it before it will be handled by Command Handler
using Interceptor
.
We will add real database to our example using if you do not have extension installed, then you will need to install it first. Yet if you are using Quickstart's Docker
container, then you are ready to go.
Let's start by implementing repository, that will be able to handle any aggregate, by storing it in sqlite
database.
Before we do that, we need to remove our In Memory implementation class App\Domain\Product\InMemoryProductRepository
we will replace it with our new implementation.
We will create using new namespace for it App\Infrastructure\Persistence.
Besides we are going to use , as this is really helpful abstraction over the PDO.
And the :
Serializer
is registered by Ecotone.
Serializer can handle serialization using .
It this case it will know how to register Cost
class, as we already registered Converter for it.
Serializer give us access for conversion from PHP
type to specific Media Type or from specific Media Type to PHP
type. We will use it to easily serialize our Product
model into JSON
and store it in database.
Each of interceptors, can inject attribute, which was used for pointcut. Just type hint for it in method declaration.
Around interceptors can inject intercepted class instance. In above example it would be Command Bus.
In case of Command Bus it may seems not needed, but if we would intercept Aggregate, then it really useful as for example you may verify if executing user have access to it.
You may read more about interceptors in .
Event CQRS PHP
Be sure to read CQRS Introduction before diving in this chapter.
External Event Handlers
are Services available in your dependency container, which are defined to handle Events
.
Events are Plain Old PHP Objects:
The difference between Events and Command is in intention. Commands are meant to trigger an given action and events are information that given action was performed successfully.
EventBus
is available in your Dependency Container by default, just like Command and Query buses. You may use Ecotone's feature to inject it directly into your Command Handler's method.
Just like EventBus is injected directly into your Command Handler, you may inject any other Service. This way you may make is clear what object your Command Handler needs in order to perform his action.
Unlike Command Handlers which points to specific Command Handler, Event Handlers can have multiple subscribing Event Handlers.
Each Event Handler can be defined as Asynchronous. If multiple Event Handlers are marked for asynchronous processing, each of them is handled in isolation. This ensures that in case of failure, we can safely retry, as only failed Event Handler will be performed again.
If your Event Handler is interested in all Events around specific business concept, you may subscribe to Interface or Abstract Class.
And then instead of subscribing to TicketWasCreated
or TicketWasCancelled
, we will subscribe to TicketEvent
.
We can also subscribe to different Events using union type hint. This way we can ensure that only given set of events will be delivered to our Event Handler.
We may subscribe to all Events published within the application. To do it we type hint for generic object
.
Events can also be subscribed by Routing.
And then Event is published with routing key
Ecotone is using message routing for cross application communication. This way applications can stay decoupled from each other, as there is no need to share the classes between them.
There may be situations when we will want to subscribe given method to either routing or class name. Ecotone those subscriptions separately to protect from unnecessary wiring, therefore to handle this case, we can simply add another Event Handler which is not based on routing key.
This way we explicitly state that we want to subscribe by class name and by routing key.
Just like with Command Bus
, we may pass metadata to the Event Bus
:
If you make your Event Handler Asynchronous, Ecotone will ensure your metadata will be serialized and deserialized correctly.
By default Ecotone will ensure that your Metadata is propagated. This way you can simplify your code by avoiding passing around Headers and access them only in places where it matters for your business logic.
To better understand that, let's consider example in which we pass the metadata to the Command.
However in order to perform closing ticket logic, information about the executorId
is not needed, so we don't access that.
However Ecotone will ensure that your metadata is propagated from Handler to Handler. This means that the context is preserved and you will be able to access executorId in your Event Handler.
DDD Aggregates PHP
This chapter will cover the basics on how to implement an Aggregate. We will be using Command Handlers in this section, so ensure reading External Command Handler section first, to understand how Command are sent and handled.
Working with Aggregate Command Handlers is the same as with External Command Handlers.
We mark given method with Command Handler
attribute and Ecotone will register it as Command Handler.
In most common scenarios, Command Handlers are used as boilerplate code, which fetch the aggregate, execute it and then save it.
This is non-business code that is often duplicated wit each of the Command Handler we introduce. Ecotone wants to shift the developer focus on the business part of the system, this is why this is abstracted away in form of Aggregate.
By providing Identifier
attribute on top of property in your Aggregate, we state that this is identifier of this Aggregate (Entity in Symfony/Doctrine world, Model in Laravel world).
This is then used by Ecotone to fetch your aggregate automatically.
However Aggregates need to be fetched from repository in order to be executed. When we will send an Command, Ecotone will use property with same name from the Command instance to fetch the Aggregate.
You may read more about Identifier Mapping and more advanced scenarios in related section.
When identifier is resolved, Ecotone use repository
to fetch the aggregate and then call the method and then save it. So basically do all the boilerplate for you.
To implement repository reference to this section.
You may use inbuilt repositories, so you don't need to implement your own.
Ecotone provides Event Sourcing Repository
, Document Store Repository
, integration with Doctrine ORM or Eloquent.
An Aggregate is a regular object, which contains state and methods to alter that state. It can be described as Entity, which carry set of behaviours. When creating the Aggregate object, you are creating the Aggregate Root.
Aggregate
tells Ecotone, that this class should be registered as Aggregate Root.
Identifier
is the external reference point to Aggregate.
This field tells Ecotone to which Aggregate a given Command is targeted.
CommandHandler
defined on static method acts as factory method. Given command it should return new instance of specific aggregate, in that case new Product.
CommandHandler
defined on non static class method is place where you would make changes to existing aggregate, fetched from repository.
DDD PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading in order to execute the query method.
And then we call it from Query Bus
:
You may of course use of Query class or metadata in case of need, which will be passed to your aggregate's method.
DDD PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
New Aggregates are initialized using public factory method (static method).
After calling createTicket
aggregate will be automatically stored.
Factory method is static method in the Aggregate class. You may have multiple factory methods if needed.
Sending Command looks exactly the same like in External Command Handlers scenario.
When factory method is called from Command Bus, then Ecotone will return new assigned identifier.
Aggregate actions are defined using public method (non-static). Ecotone will ensure loading and saving the aggregate after calling action method.
ChangeTicket
should contain the identifier of Aggregate instance on which action method should be called.
And then we call it from Command Bus
:
In fact we don't need to provide identifier in our Commands in order to execute specific Aggregate instance. We may not need a Command class in specific scenarios at all.
In this scenario, if we would add Command Class
, it would only contain of the identifier and that would be unnecessary boilerplate code. To solve this we may use Metadata in order to provide information about instance of the aggregate we want to call.
"aggregate.id"
is special metadata that provides information which aggregate we want to call.
When we avoid creating Command Classes with identifiers only, we decrease amount of boilerplate code that we need to maintain.
There may be a cases where you would like to do conditional logic, if aggregate exists do thing, otherwise this. This may be useful to keep our higher level code clean of "if" statements and to simply API by exposing single method.
Both Command Handlers are registered for same command CreateTicket
, yet one method is factory method
and the second is action method
.
When Command will be sent, Ecotone will try to load the aggregate first,
if it will be found then changeTicket
method will be called, otherwise createTicket
.
Redirected aggregate creation works the same for Event Sourced Aggregates.
DDD PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
To tell Ecotone
to retrieve Events from your Aggregate add trait WithEvents
which contains two methods: recordThat
and getRecordedEvents
.
After importing trait, Events will be automatically retrieved and published after handling Command in your Aggregate.
Using recordThat
will delay sending an event till the moment your Aggregate is saved in the Repository. This way you ensure that no Event Handlers will be called before the state is actually stored.
Sometimes you may have situation, where Event from one Aggregate will actually change another Aggregate. In those situations you may actually subscribe to the Event directly from Aggregate, to avoid creating higher level boilerplate code.
In those situations however you need to ensure event contains of reference id, so Ecotone knows which Aggregate to load from the database.
For more sophisticated scenarios, where there is no direct identifier in corresponding event, you may use of identifier mapping. You can read about it more in Saga related section.
You may subscribe to Events by names, instead of the class itself. This is useful in cases where we want to decoupled the modules more, or we are not interested with the Event Payload at all.
For Events published from your Aggregate, it's enough to provide NamedEvent
attribute with the name of your event.
And then you can subscribe to the Event using name
DDD PHP
There may be a scenario where the creation of an Aggregate is conditioned by the current state of another Aggregate.
Ecotone provides a possibility for that and lets you focus more on domain modeling rather than technical nuances you may face trying to implement an actual use case.
This case is supported by both Event Sourcing and State-based Aggregates.
It is possible to send a command to an Aggregate and expect a State-based Aggregate to be returned.
It is also possible to send a command to an Aggregate and expect the Event Sourcing Aggregate to be returned.
Both of the Aggregates (called and result) can still record their Events using an Internal Recorder. Recorded Events will be published after the operation is persisted in the database.
In the case of an Event Sourcing Aggregate recording an event indicates a state change of that Aggregate.
Also, when calling a State-based Aggregate its state may be changed before returning the newly created Aggregate. E.g. you want to save a reference to the newly created Aggregate.
Ecotone will try to persist both called and returned Aggregates.
When splitting your aggregates into the smallest, independent parts of the domain you have to be aware of transaction boundaries which Aggregate has to protect. In the case where the creation of an Aggregate is the transaction boundary of another Aggregate, it may require a state change of the one that protects that boundary.
This is a very specific scenario where two aggregates will persist at the same time within the same transaction which is covered by Ecotone.
Asynchronous PHP Workers
Ecotone
provides abstractions for asynchronous execution.
We got new requirement:
User should be able to place order for different products.
We will need to build Order
aggregate.
Let's start by creating PlaceOrderCommand
with ordered product Ids
We will need OrderedProduct
value object, which will describe, cost and identifier of ordered product
And our Order
aggregate
placeOrder
- Place order method make use of QueryBus
to retrieve cost of each ordered product.
You could find out, that we are not using application/json
for product.getCost
query, ecotone/jms-converter
can handle array
transformation, so we do not need to use json
.
We do not need to change or add new Repository
, as our exiting one can handle any new aggregate arriving in our system.
Let's change our testing class and run it!
We want to be sure, that we do not lose any order, so we will register our order.place Command Handler
to run asynchronously using RabbitMQ
now.
Let's start by adding extension to Ecotone
, that can handle RabbitMQ:
We also need to add our ConnectionFactory
to our Dependency Container.
Let's add our first AMQP Backed Channel
(RabbitMQ Channel), in order to do it, we need to create our first Application Context.
Application Context is a non-constructor class, responsible for extending Ecotone
with extra configurations, that will help the framework act in a specific way. In here we want to tell Ecotone
about AMQP Channel
with specific name.
Let's create new class App\Infrastructure\MessagingConfiguration.
ServiceContext
- Tell that this method returns configuration. It can return array of objects or a single object.
Now we need to tell our order.place
Command Handler, that it should run asynchronously using our neworders
channel.
We do it by adding Asynchronous
annotation with channelName
used for asynchronous endpoint.
Endpoints using Asynchronous
are required to have endpointId
defined, the name can be anything as long as it's not the same as routing key (order.place)
.
Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list
We have new asynchronous endpoint available orders.
Name comes from the message channel name.
You may wonder why it is not place_order_endpoint,
it's because via single asynchronous channel we can handle multiple endpoints, if needed. This is further explained in asynchronous section.
Let's change orderId
in our testing command, so we can place new order.
After running our testing command bin/console ecotone:quickstart
we should get an exception:
That's fine, we have registered order.place
Command Handler to run asynchronously, so we need to run our asynchronous endpoint
in order to handle Command Message
. If you did not received and exception, it's probably because orderId
was not changed and we already registered such order.
Let's run our asynchronous endpoint
Like we can see, it ran our Command Handler and placed the order.
We can change our testing command to run only Query Handler
and check, if the order really exists now.
There is one thing we can change.
As in asynchronous scenario we may not have access to the context of executor to enrich the message,, we can change our AddUserIdService Interceptor
to perform the action before sending it to asynchronous channel.
This Interceptor is registered as Before Interceptor
which is before execution of our Command Handler, but what we want to achieve is, to call this interceptor before message will be send to the asynchronous channel. For this there is Presend
Interceptor available.
Change Before
annotation to Presend
annotation and we are done.
Ecotone will do it best to handle serialization and deserialization of your headers.
Now if non-administrator will try to execute this, exception will be thrown, before the Message will be put to the asynchronous channel. Thanks to Presend
interceptor, we can validate messages, before they will go asynchronous, to prevent sending incorrect messages.
We made it through, Congratulations! We have successfully registered asynchronous Command Handler and safely placed the order. We have finished last lesson. You may now apply the knowledge in real project or check more advanced usages starting here Modelling Overview.
Repository PHP
Read Aggregate Introduction sections first to get more details about Aggregates.
Repositories are used for retrieving and saving the aggregate to persistent storage. Typical flow for calling aggregate method would looks like below:
By setting up Repository
we provide Ecotone with functionality to fetch and store the Aggregate , so we don't need to do it on our own.
If our class is defined as Aggregate, Ecotone will use Repository in order fetch and store it, whenever the Command
is sent via Command Bus
.
There are two types of repositories. One for storing State-Stored Aggregate
and another one for storing Event Sourcing Aggregate
.
Based on which interface is implemented, Ecotone
knows which Aggregate type was selected.
State-Stored Aggregate are normal Aggregates
, which are not Event Sourced
.
canHandle method
informs, which Aggregate Classes
can be handled with this Repository
. Return true, if saving specific aggregate is possible, false otherwise.
findBy method
returns if found, existing Aggregate instance
, otherwise null.
save method
is responsible for storing given Aggregate instance
.
$identifiers
are array of #[Identifier]
defined within aggregate.
$aggregate
is instance of aggregate
$metadata
is array of extra information, that can be passed with Command
$expectedVersion
if version locking by #[Version]
is enabled it will carry currently expected
When your implementation is ready simply mark it with #[Repository]
attribute:
This is example implementation of Standard Repository using Doctrine ORM.
Repository:
When your implementation is ready simply mark it with #[Repository]
attribute:
Ecotone provides inbuilt repositories to get you started quicker. This way you can enable given repository and start implementing higher level code without worrying about infrastructure part.
This provides integration with Doctrine ORM. To enable it read more in Symfony Module Section.
This provides integration with Eloquent ORM. Eloquent support is available out of the box after installing Laravel module.
This provides integration Document Store using relational databases. It will serialize your aggregate to json and deserialize on load using Converters. To enable it read in Dbal Module Section.
Ecotone provides inbuilt Event Sourcing Repository, which will set up Event Store and Event Streams. To enable it read Event Sourcing Section.
By default Ecotone when we have only one Standard and Event Sourcing Repository registered, Ecotone will use them for our Aggregate by default. This comes from simplification, as if there is only one Repository of given type, then there is nothing else to be choose from. However, if we register multiple Repositories, then we need to take over the process and tell which Repository will be used for which Aggregate.
In case of Custom Repositories we do it using canHandle method.
In case of inbuilt Repositories, we should follow configuration section for given type
Custom repository for Event Sourced Aggregates is described in more details under Event Sourcing Repository section.
Let's consider scenario where we want to store new record in Persons table. To make it happen just like with Business Method we will create an Interface, yet this time we will mark it with DbalBusinessMethod.
The first parameter passed to DbalBusinessMethod is actual SQL, where we can provide set of named parameters. Ecotone will automatically bind parameters from method declaration to SQL ones by names.
We may bind parameter name explicitly by using DbalParameter attribute.
This can be used when we want to decouple interface parameter names from binded parameters or when name in database column is not explicit enough for being part of interface.
If we want to return amount of the records that have been changed, we can add int type hint to our Business Method:
We may want to fetch data from the database and for this we will be using DbalQueryBusinessMethod.
The above will return result as associative array with the columns provided in SELECT statement.
To format result differently we may use different fetch modes. The default fetch Mode is associative array.
This will extract the first column from each row, which allows us to return array of person Ids directly.
To get single variable out of Result Set we can use First Column of first row Mode.
This way we can provide simple interfaces for things Aggregate SQLs, like SUM or COUNT.
To fetch first Row of given Result Set, we can use First Row Mode.
This will return array containing person_id and name.
When using First Row Mode, we may end up having no returned row at all. In this situation Dbal will return false, however if Return Type will be nullable, then Ecotone will convert false to null.
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one.
Each parameter may have different type and Ecotone will try to recognize specific type and set it up accordingly. If we want, we can take over and define the type explicitly.
Special type of Business Interface is Repository. This Interface allows us to simply load and store our Aggregates directly. In situations when we call Command directly in our Aggregates we won't be in need to use it. However for some specific cases, where we need to load Aggregate and store it outside of Aggregate's Command Handler, this business interface becomes useful.
Ecotone will read type hint to understand which Aggregate you would like to fetch or save.
When using Pure Event Sourced Aggregate, instance of Aggregate does not hold recorded Events. Therefore passing aggregate instance would not contain any information about recorded events. For Pure Event Sourced Aggregates, we can use direct event passing to the repository:
Let's take as an example creating new Ticket
We may define interface, that will call this Command Handler whenever will be executed.
This way we don't need to use Command Bus and we can bypass all Bus related interceptors.
The attribute #[BusinessMethod] tells Ecotone that given Interface is meant to be used as entrypoint to Messaging and which Message Handler it should send the Message to. Ecotone will provide implementation of this interface directly in our Dependency Container.
We may also execute given Aggregate directly using Business Interface.
Then we define interface:
We may of course pass Command class if we need to pass more data to our Aggregate's Command Handler.
Defining Query Interface works exactly the same as Command Interface and we may also use it with Aggregates.
Then we may call this Query Handler using Interface
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array
to TicketDTO
.
Such conversion are useful in order to work with objects and to avoid writing transformation code in our business code. We can build generic queries, and transform them to different classes using different business methods.
Then we may call this Query Handler using Interface
Ecotone will use defined Converter to convert array to CreateTicket command class.
This type of conversion is especially useful, when we receive data from external source and we simply want to push it to given Message Handler. We avoid doing transformations ourselves, as we simply push what we receive as array.
We may want to use higher level object within our Interface than simple scalar types. As those can't be understood by our Database, it means we need Conversion. Ecotone provides default conversions and possibility to customize the process.
Ecotone provides inbuilt Conversion for Date Time based objects.
If your Class contains __toString
method, it will be used for doing conversion.
For example database column may be of type JSON or Binary. In those situation we may state what Media Type given parameter should be converted too, and Ecotone will do the conversion before it's executing SQL.
In above example roles will be converted to JSON before SQL will be executed.
Then we will be able to use our Business Method with PersonRole, which will be converted to given Media Type before being saved:
This way we can provide higher level classes, keeping our Interface as close as it's needed to our business model.
We may use Expression Language to dynamically evaluate our parameter.
payload is special parameter in expression, which targets value of given parameter, in this example it will be PersonName. In above example before storing name in database, we will call toLowerCase() method on it.
We may also access any Service from our Dependency Container and run a method on it.
reference is special function within expression which allows us to fetch given Service from Dependency Container. In our case we've fetched Service registered under "converter" id and ran normalize method passing PersonName.
We may use Dbal Parameters on the Method Level, when parameter is not needed.
In case parameter is a static value.
We can also use dynamically evaluated parameters and access Dependency Container to get specific Service.
In case of Method and Class Level Dbal Parameters we get access to passed parameters inside our expression. They can be accessed via method parameters names.
As we can use method level, we can also use class level Dbal Parameters. In case of Class level parameters, they will be applied to all the method within interface.
To make our SQLs more readable we can also use the expression language directly in SQLs.
Suppose we Pagination class
then we could use it like follows:
To enable expression for given parameter, we need to follow structure :(expression)
, so to use limit property from Pagination class we will write :(pagination.limit)
Ecotone will read the Docblock and based on that will deserialize Result Set from database to list of PersonNameDTO.
Using combination of First Row Fetch Mode, we can get first row and then use it for conversion to PersonNameDTO.
For big result set we may want to avoid fetching everything at once, as it may consume a lot of memory. In those situations we may use Iterator Fetch Mode, to fetch one by one. If we want to convert each result to given Class, we may define docblock describing the result:
Each returned row will be automatically convertered to PersonNameDTO.
We may return the result in specific format directly. This is useful when Business Method is used on the edges of our application and we want to return the result directly.
In this example, result will be returned in application/json.
Process Manager Saga PHP
When loading Aggregates or Sagas we need to know what Identifier should be used for that. This depending on the business feature we work may require different approaches. In this section we will dive into different solutions which we can use.
Ecotone resolves the mapping automatically, when Identifier in the Aggregate/Saga is named the same as the property in the Command/Event.
then, if Message has productId, it will be used for Mapping:
You may use multiple aggregate identifiers or identifier as objects (e.g. Uuid) as long as they provide __toString
method
We may also expose identifier over public method by annotating it with attribute IdentifierMethod("productId").
If the property name is different than Identifier in the Aggregate/Saga, we need to give Ecotone
a hint, how to correlate identifiers.
We can do that using TargetIdentifier attribute, which states to which Identifier given property references too:
When there is no property to correlate inside Command or Event, we can use Identifier from Metadata.
When we've the identifier inside Metadata
then we can use identifierMetadataMapping.
Suppose the orderId identifier is available in metadata under key orderNumber, then we can then use this mapping:
We may provide Identifier dynamically using Command Bus. This way we can state explicitly what Aggregate/Saga instance we do refer too. Thanks to we don't need to define Identifier inside the Command and we can skip any kind of mapping.
In some scenario we won't be in deal to create an Command class at all. For example we may provide block user action, which changes the status:
Event so we are using "aggregate.id" in the metadata, this will work exactly the same for Sagas. Therefore if we want to trigger Message Handler on the Saga, we can use "aggregate.id" too.
When using identifierMapping configuration, we get access to the Message fully and to Dependency Container. To access specific part we will be using:
payload -> Represents our Event/Command class
headers -> Represents our Message's metadata
reference('name') -> Allow to access given service from our Dependency Container
Suppose the orderId
identifier is available in metadata under key orderNumber
, then we can tell Message Handler to use this mapping:
Suppose our Identifier is an Email object within Command class and we would like to normalize before it's used for fetching the Aggregate/Saga:
Suppose we receive external order id, however we do have in database our internal order id that should be used as Identifier. We could then have a Service registered in DI under "orderIdExchange":
Then we can make use of it in our identifier Mapping
Ecotone allows to work with Database using DbalBusinessMethod. The goal is to create abstraction which significantly reduce the amount of boilerplate code required to implement data access layers. Thanks to Dbal based Business Methods we are able to avoid writing integration and transformation level code and focus on the Business part of the system. To make use of Dbal based Business Method, .
Above example will use DbalConnectionFactory::class for database Connection, which is the default for . If you want to run Business Method on different connection, you can do it using connectionReferenceName parameter inside the Attribute.
To make use of this Business Interface, we need our being registered.
Be sure to read before diving in this chapter.
Business Interface aims to reduce boierplate code and make your domain actions explicit. In Application we describe an Interface, which executes Business methods. Ecotone will deliver implementation for this interface, which will bind the interface with specific actions. This way we can get rid of delegation level code and focus on the things we want to achieve. For example, if we don't want to trigger action via Command/Query Bus, we can do it directly using our business interface and skip all the Middlewares that would normally trigger during Bus execution. There are different types of Business Interfaces and in this chapter we will discuss the basics of build our own Business Interface, in next sections we will dive into specific types of business interfaces: and .
From lower level API Business Method
is actually a .
If we have registered then we let Ecotone do conversion to
Message Handler specific format:
If we have registered then we let Ecotone do conversion to
Message Handler specific format:
By default Ecotone will convert time using Y-m-d H:i:s.u
format. We may override this using .
We may override this using .
If we are using higher level classes like Value Objects, we will be able to change the type to expected one. For example if we are using we can register Converter for our PersonRole Class and convert it to JSON or XML.
Read more about Ecotone's
In rich business domains we will want to work with higher level objects than associate arrays. Suppose we have PersonNameDTO and defined for it.
Sagas are part of the Ecotone's Workflow support. To read more refer to .
We can make use of Before
or Presend
to enrich event's metadata with required identifiers.
There may be cases where more advanced mapping may be needed. In those cases we can use identifier mapping based on .
Read previous section to find out more about Interceptors.
We may aswell intercept Asynchronous Endpoints pretty easily. We do it by using pointing to AsynchronousRunningEndpoint class.
As part of around intercepting, if we need Message Payload to make the decision we can simply inject that into our interceptor:
We can also inject Message Headers into our interceptor. We could for example inject Message Consumer name in order to decide whatever to start the transaction or not:
Ecotone provides ability to extend any Messaging Gateway using Interceptors. We can hook into the flow and add additional behavior.
For better understanding, please read Interceptors section before going through this chapter.
Suppose we want to add custom logging, whenever any Command is executed. We know that CommandBus is a interface for sending Commands, therefore we need to hook into that Gateway.
Intercepting Gateways, does not differ from intercepting Message Handlers.
We may also want to have different types of Message Buses for given Message Type. For example we could have EventBus with audit which we would use in specific cases. Therefore we want to keep the original EventBus untouched, as for other scenarios we would simply keep using it.
To do this, we will introduce our new EventBus:
That's basically enough to register our new interface. This new Gateway will be automatically registered in our DI container, so we will be able to inject it and use.
It's enough to extend given Gateway with custom interface to register new abstraction in Gateway in Dependency Container. In above example AuditableEventBus will be automatically available in Dependency Container to use, as Ecotone will deliver implementation.
Now as this is separate interface, we can point interceptor specifically on this
We could of course intercept by attributes, if we would like to make audit functionality reusable
and then we pointcut based on the attribute
Gateways can also be extended with asynchronous functionality on which you can read more in Asynchronous section.
Ecotone provides easy way to pass Message Headers (Metadata) with your Message and use it in your Message Handlers or Interceptors. In case of asynchronous scenarios, Message Headers will be automatically mapped and passed to through your Message Broker.
Pass your metadata (headers), as second parameter.
Then you may access them directly in Message Handlers:
If you have defined Converter for given type, then you may type hint for the object and Ecotone will do the conversion:
And then we can use Classes instead of scalar types for our Headers:
Ecotone provides a lot of support for Conversion, so we can work with higher level business class not scalars. Find out more in Conversion section.
Ecotone by default propagate all Message Headers automatically. This as a result preserve context, which can be used on the later stage. For example we may provide executorId header and skip it in our Command Handler, however use it in resulting Event Handler.
This will execute Command Handler:
and then even so, we don't resend this Header when publishing Event, it will still be available for us:
When publishing Events from Aggregates or Sagas, metadata will be propagated automatically too.
When using Messaging we may want to be able to trace where given Message came from, who was the parent how it was correlated with other Messages. In Ecotone all Messages contains of Message Id, Correlation Id and Parent Id within Metadata. Those are automatically assigned and propagated by the Framework, so from application level code we don’t need to deal manage those Messaging level Concepts.
Using Message Id, Correlation Id are especially useful find out what have happened during the flow and if any part of the flow has failed. Using already propagated Headers, we may build our own tracing solution on top of what Ecotone provides or use inbuilt support for OpenTelemetry.
Each Message receives it's own unique Id, which is Uuid generated value. This is used by Ecotone to provide capabilities like Message Deduplication, Tracing and Message identification for Retries and Dead Letter.
"parentId" header refers to Message that was direct ancestor of it. In our case that can be correlation of Command and Event. As a result of sending an Command, we publish an Event Message.
Parent id will always refer to the previous Message. What is important however is that, if we have multiple Event Handlers each of them will receive it's own copy of the Message with same Id.
Correlation Id is useful for longer flows, which can span over multiple Message Handlers. In those situations we may be interested in how our Message flow have branched:
As mentioned earlier, Events are stored in form of a Event Stream. Event Stream is audit of Events, which happened in the past. However to protect our business invariants, we may want to work with current state of the Aggregate to know, if given action is possible or not (business invariants).
Business Invariants in short are our simple "if" statements inside the Command Handler in the Aggregate. Those protect our Aggregate from moving into incorrect state. With State-Stored Aggregates, we always have current state of the Aggregate, therefore we can check the invariants right away. With Event-Sourcing Aggregates, we store them in form of an Events, therefore we need to rebuild our Aggregate, in order to protect the invariants.
Suppose we have Ticket Event Sourcing Aggregate.
For this Ticket we do allow for assigning an Person to handle the Ticket. Let's suppose however, that Business asked us to allow only one Person to be assigned to the Ticket at time. With current code we could assign unlimited people to the Ticket, therefore we need to protect this invariant.
To check if whatever Ticket was already assigned to a Person, our Aggregate need to have state applied which will tell him whatever the Ticket was already assigned. To do this we use EventSourcingHandler attribute passing as first argument given Event class. This method will be called on reconstruction of this Aggregate. So when this Aggregate will be loaded, if given Event was recorded in the Event Stream, method will be called:
Then this state, can be used in the Command Handler to decide whatever we can trigger an action or not:
As you can see, it make sense to only assign to the state attributes that protect our invariants. This way the Aggregate stays readable and clean of unused information.
Ecotone comes with full automation for setting up Event Sourcing for us. This we can we really easily roll out new features with Event Sourcing with just minimal or none setup at all.
Before we will start, let's first install Event Sourcing module, which will provide us with all required components:
We need to configure DBAL Support in order to make use of it.
Ecotone PDO Event Sourcing does provide support for three databases:
PostgreSQL
MySQL
MariaDB
Ecotone provides inbuilt functionality to serialize your Events, which can be customized in case of need. This makes Ecotone take care of Event Serialization/Deserialization, and allows us to focus on the business side of the code.
We can take over this process and set up our own Serialization, however Ecotone JMS Converter can fully do it for us, so we can simply focus on the business side of the code. To make it happen all we need to do, is to install JMS Package and we are ready to go:
Event Sourcing PHP
Ecotone
comes with in-built integration for Event Sourcing which works with databases like Postgres
, MySQL
or MariaDB
for storing Events and allows for building and storing Projected Views in any storage we decide.
Read more in following chapters.
Starting with Event Sourcing in PHP [Article]
Using Event Sourcing in PHP
Before diving into this section be sure to understand how Aggregates works in Ecotone based on previous sections.
Ecotone provides higher level abstraction to work with Event Sourcing, which is based on Event Sourced Aggregates. Event Sourced Aggregate just like normal Aggregates protect our business rules, the difference is in how they are stored.
Normal Aggregates are stored based on their current state:
Yet if we change the state, then our previous history is lost:
Having only the current state may be fine in a lot of cases and in those situation it's perfectly fine to make use of State-Stored Aggregates. This is most easy way of dealing with changes, we change and we forget the history, as we are interested only in current state.
When we actually need to know what was the history of changes, then State-Stored Aggregates are not right path for this. If we will try to adjust them so they are aware of history we will most likely complicate our business code. This is not necessary as there is better solution - Event Sourced Aggregates.
When we are interested in history of changes, then Event Sourced Aggregate will help us. Event Sourced Aggregates are stored in forms of Events. This way we preserve all the history of given Aggregate:
When we change the state the previous Event is preserved, yet we add another one to the audit trail (Event Stream).
This way all changes are preserved and we are able to know what was the historic changes of the Product.
The audit trail of all the Events that happened for given Aggregate is called Event Stream. Event Stream contains of all historic Events for all instance of specific Aggregate type, for example all Events for Product Aggregate
Let's now dive a bit more into Event Streams, and what they actually are.
Just as with Standard Aggregate, ES Aggregates are called by Command Handlers, however what they return are Events and they do not change their internal state.
When this Aggregate will be called via Command Bus with CreateProduct Command, it will then return new ProductWasCreated Event.
Command Handlers may return single events, multiple events or no events at all, if nothing is meant to be changed.
Aggregate Version
Aggregate Id
Aggregate Type
To find out about current version of Aggregate Ecotone will look for property marked with Version Attribute.
We don't to add this property directly, we can use trait instead:
Anyways, this is all we need to do, as Ecotone will take care of reading and writing to this property. This way we can focus on the business logic of the Aggregate, and Framework will take care of tracking the version.
We need to tell to Ecotone what is the Identifier of our Event Sourcing Aggregate. This is done by having property marked with Identifier in the Aggregate:
As Command Handlers are pure and do not change the state of our Event Sourcing Aggregate, this means we need a different way to mutate the state in order to assign the identifier. For changing the state we use EventSourcingHandler attribute, which tell Ecotone that if given Event happens, then trigger this method afterwards:
So when this Command Handler happens:
What actually will happen under the hood is that this Event will be applied to the Event Stream:
Let's start by manually appending Events using Event Store. This will help us understand better the concepts behind the Event Stream and Event Partitioning. After we will understand this part, we will introduce Event Sourcing Aggregates, which will abstract away most of the logic that we will need to do in this chapter.
Working with Event Stream directly may be useful when migrating from existing system where we already had an Event Sourcing solution, which we want to refactor to Ecotone.
After installing Ecotone's Event Sourcing we automatically get access to Event Store abstraction. This abstraction provides an easy to work with Event Streams.
Let's suppose that we do have Ticketing System like Jira with two basic Events "Ticket Was Registered" and "Ticket Was Closed". Of course we need to identify to which Ticket given event is related, therefore will have some Id.
In our code we can define classes for those:
To store those in the Event Stream, let's first declare it using - Event Store abstraction.
Event Store provides few handy methods:
As we want to append some Events, let's first create an new Event Stream
This is basically enough to create new Event Stream. But it's good to understand what actually happens under the hood.
In short Event Stream is just audit of series of Events. From the technical point it's a table in the Database. Therefore when we create an Event Stream we are actually creating new table.
Event Stream table contains:
Event Id - which is unique identifier for Event
Event Name - Is the named of stored Event, which is to know to which Class it should be deserialized to
Payload - is actual Event Class, which is serialized and stored in the database as JSON
Metadata - Is additional information stored alongside the Event
To append Events to the Event Stream we will use "appendTo" method
This will store given Event in Ticket's Event Stream
Above we've stored Events for Ticket with id "123". However we can store Events from different Tickets in the same Event Stream.
We now can load those Events from the Event Stream
This will return iterator of Ecotone's Events
As we can see this maps to what we've been storing in the Event Stream table.
Payload will contains our deserialized form of our event, so for example TicketWasRegistered
.
We could also fetch list of Events without deserializing them. $events = $eventStore->load("ticket", deserialize: false);
In that situations payload will contains an associative array. This may be useful when iterating over huge Event Streams, when there is no need to actually work with Objects. Besides that ability to load in batches may also be handy.
Let's consider what may actually happen during concurrent access to our System. This may be due more people working on same Ticket or simply because our system did allow for double clicking of the same action.
In those situations we may end up storing the same Event twice
Without any protection we will end up with Closing Events in the Event Stream. That's not really ideal, as we will end up with Event Stream having incorrect history:
This is the place where we need to get back to persistence strategy:
We've created this Stream with "simple" persistence strategy. This means we can apply any new Events without guards. This is fine in scenarios where we are dealing with no business logic involved like collecting metrics, statistics. where all we to do is to push push Events into the Event Stream, and duplicates are not really a problem. However simple strategy (which is often the only strategy in different Event Sourcing Frameworks), comes with cost:
We lose linear history of our Event Stream, as we allow for storing duplicates. This may lead to situations which may lead to incorrect state of the System, like Repayments being recorded twice.
As we do allow for concurrent access, we can actually make wrong business decisions. For example we could give to the Customer promotion code twice.
The "simple strategy" is often the only strategy that different Event Sourcing Frameworks provide. However after the solution is released to the production, we often start to recognize above problems, yet now as we don't have other way of dealing with those, we are on mercy of fixing the causes, not the root of the problem. Therefore we need more sophisticated solution to this problem, to solve the cause of it not the side effects. And to solve the cause we will be using different persistence strategy called "partition strategy".
Event Stream can be split in partitions. Partition is just an sub-stream of Events related to given Identifier, in our context related to Ticket.
Partition is linear history for given identifier, where each Event is within partition is assigned with version. This way we now, which event is at which position. Therefore in order to partition the Stream, we need to know the partition key (in our case Ticket Id). By knowing the partition key and last version of given partition, we can apply an Event at the correct position. To create partitioned stream, we would create Event Stream with different strategy:
This will create Event Stream table with constraints, which will require:
Aggregate Id - This will be our partition key
Aggregate Type - This may be used if we would store more Aggregate types within same Stream (e.g. User), as additional partition key
Aggregate Version - This will ensure that we won't apply two Events at the same time to given partition
We append those as part of Event's metadata:
Let's now see, how does it help us ensuring that our history is always correct. Let's assume that currently we do have single Event in the partition
Now let's assume two requests happening at the same time:
This way allows us to be sure that within request we are dealing with latest Event Stream, because if that's not true we will end up in concurent exception. This kind of protection is crucial when dealing with business logic that depends on the previous events, as it ensures that there is no way to bypass it.
PHP Interceptors Middlewares
Stopping or modify usual processing cycle
Calling some shared functionality or adding additional behavior
This all can be done without modifying the code itself, as we hook into the existing flows.
Every interceptor has Pointcut
attribute, which describes for specific interceptor, which endpoints it should intercept.
CLASS_NAME
- indicates intercepting specific class or interface or class containing attribute on method or class level
CLASS_NAME::METHOD_NAME
- indicates intercepting specific method of class
expression || expression
- Indicating one expression or another e.g. Product\*||Order\*
expression && expression
- Indicating one expression and another e.g.
App\Domain\* && App\Attribute\RequireAdministrator
There are four types of interceptors. Each interceptor has it role and possibilities. Interceptors are called in following order:
Before
Around
After
Presend
Before Interceptor is called after message is sent to the channel, before execution of Endpoint.
Let's create our first Before Interceptor.
Now we need to annotate our Command Handler:
Whenever we call our command handler, it will be intercepted by AdminVerificator now.
If return type is not void
new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders: true it will tell Ecotone, that we we want to modify Message headers instead of payload.
Use Message Filter
, to eliminate undesired messages based on a set of criteria.
This can be done by returning null from interceptor, if the flow should proceed, then payload should be returned.
If return type is not void
new modified based on previous Message will be created from the returned type. If we additionally add changeHeaders=true
it will tell Ecotone, that we we want to modify Message headers instead of payload.
The Around Interceptor
have access to actual Method Invocation.
This does allow for starting action before method invocation is done, and finishing it after.
Around interceptor
is a good place for handling actions like Database Transactions or logic that need to access invoked object.
As we used Command Bus
interface as pointcut, we told Ecotone
that it should intercept Command Bus Gateway.
Now whenever we will call any method on Command Bus, it will be intercepted with transaction.
The other powerful use case for Around Interceptor is intercepting Aggregate.
Suppose we want to verify, if executing user has access to the Aggregate.
We have placed @IsOwnerOfPerson
annotation as the top of class. For interceptor pointcut it means, that each endpoint defined in this class should be intercepted. No need to add it on each Command Handler now.
We've passed the executd Aggregate instance - Person to our Interceptor. This way we can get the context of the executed object in order to perform specific logic.
We will intercept all endpoints within Order\ReadModel namespace, by adding result coming from the endpoint under result
key.
Presend Interceptor
can used to verify if data is correct before sending to asynchronous channel, or we may want to check if user has enough permissions to do given action.
This will keep our asynchronous channel free of incorrect messages.
Presend can't intercept Gateways like (Command/Event/Query) buses, however in context of Gateways using Before Interceptor lead to same behaviour, therefore can be used instead.
Aggregates under the hood make use of Partition persistence strategy (Refer to ). This means that we need to know:
We will explore how applying Events works more in .
Aggregate Type will be the same as Aggregate Class. We can decouple the class from the Aggregate Type, more about this can be found in "" section.
As storing in Event Store is abstracted away, the code stays clean and contains only of the business part. We can the Stream Name, Aggregate Type and even Event Names when needed.
In we discussed that Event Sourcing Aggregates are built from Event Streams stored in the data store. Yet it's important to understand how those Events gets to the Event Stream in the first place.
As a result of duplicated Events (Which hold different Message Id) we will trigger side effects twice. Therefore our Event Handlers will need to handle this situation to avoid for example trigger requests to external system twice, or building wrong Read Model using .
Ecotone
provide possibility to handle via Interceptors
.
Interceptor
intercepts the process of handling the message, this means we can do actions like:
Enriching the
If you are familiar with you will find a lot of similarities.
Type of Interceptor more about it
Precedence defines ordering of called interceptors. The lower the value is, the quicker Interceptor will be called. It's safe to stay with range between -1000 and 1000, as numbers bellow -1000 and higher than 1000 are used by Ecotone.
The precedence is done within a specific .
NAMESPACE*
- Indicating all starting with namespace prefix e.g. App\Domain\*
Before interceptor is called before endpoint is executed.
Before interceptors can used in order to stop the flow
, throw an exception
or enrich the
To understand it better, let's follow an example, where we will intercept Command Handler with verification if executor is an administrator.
Let's start by creating Attribute called RequireAdministrator in new namepace.
We are using in here which is looking for #[RequireAdministrator] annotation in each of registered .
The void return type
is expected in here. It tells Ecotone
that, this Before Interceptor is not modifying the Message and message will be passed through. The message flow however can be interrupted by throwing exception.
Our Command Handler
is using ChangePriceCommand
class and our AdminVerificator interceptor
is using array $payload
. They are both referencing payload of the , yet if we define a class as type hint, Ecotone will do the Conversion for us.
If return type is not void
new Message will be created from the returned type.
Let's follow an example, where we will enrich payload with timestamp.
Suppose we want to add executor Id, but as this is not part of our Command, we want add it to our Headers.
After interceptor
is called after endpoint execution has finished.
It does work exactly the same as
After interceptor can used to for example to enrich QueryHandler
result.
Presend Interceptor
is called before Message is actually send to the channel.
In synchronous channel there is no difference between Before
and Presend.
The difference is seen when the channel is .
There are two ways we can configure our Aggregate to record Events.
This way of handling events does allow for pure functions. This means that actions called on the Aggregate returns Events and are not changing internal state of Aggregate.
EventSourcingAggregate
and Identifier
works exactly the same as State-Stored Aggregate.
Event Sourced Aggregate must provide version. You may leave it to Ecotone
using WithAggregateVersioning
or you can implement it yourself.
CommandHandler
for event sourcing returns events generated by specific method. This will be passed to the Repository
to be stored.
EventSourcingHandler
is method responsible for reconstructing Aggregate
from previously created events. At least one event need to be handled in order to provide Identifier
.
This way of handling events allow for similarity with State Stored Aggregates. This convention requires changing internal state of Aggregate to record Events. Therefore Pure ES Aggregate is recommended as it's not require for any internal state changes in most of the scenarios. However ES Aggregate with Internal Recorder may be useful for projects migrating with other solutions, or when our team is heavily used to working this way.
In order to make use of alternative way of handling events, we need to provide trait WithEvents.
Command Handlers instead of returning events are acting the same as State Stored Aggregates.
All events which will be published using recordThat
will be passed to the Repository
to be stored.
In its lifetime events may change. In order to track those changes Ecotone provides possibility of versioning events.
Value given with Revision
attribute will be stored by Ecotone in events metadata. Attribute is used only when event is saved in event store. In order to read it, you can access events metadata, e.g. in event handler.
This feature is available as part of Ecotone Enterprise.
Depending on the version we may actually want to restore our Aggregate a bit differently. This is especially useful when we've changed the way Events are structured and introduced new version of the Event. For this we can use revision header to access the version on which given Event was stored.
We may inject any type of Header that was stored together with the Event. This means inbuilt not only headers like timestamp, id, correlationId are avaiable out of the box, but also custom headers provided by the application (e.g. userId).
Ecotone comes with inbuilt Event Sourcing repository after Event Sourcing package is installed. However you want to roll out your own storage for Events, or maybe you already use some event-sourcing framework and would like to integrate with it. For this you can take over the control by introducing your own Event Sourcing Repository.
Using Custom Event Sourcing Repository will not allow you to make use of inbuilt projection system. Therefore consider configuring your own Event Sourcing Repository only if you want to build your own projecting system.
We do start by implementing EventSourcingRepository interface:
canHandle - Tells whatever given Aggregate is handled by this Repository
findBy - Method returns previously created events for given aggregate. Which Ecotone will use to reconstruct the Aggregate.
save - Stores events recorded by Event Sourced Aggregate
and then we need to mark class which implements this interface as Repository
Ecotone provides enough information to decide how to store provided events.
Identifiers will hold array of identifiers related to given aggregate (e.g. ["orderId" ⇒ 123]). Events will be list of Ecotone's Event classes, which contains of payload and metadata, where payload is your Event class instance and metadata is specific to this event. Metadata as parameter is generic metadata available at the moment of Aggregate execution. Version before handling on other hand is the version of the Aggregate before any action was triggered on it. This can be used to protect from concurrency issues.
The structure of Events is as follows:
It's worth to mention about Ecotone's Events and especially about metadata part of the Event. Each metadata for given Event contains of three core Event attributes:
"_aggregate_id" - This provides aggregate identifier of related Aggregate
"_aggregate_version" - This provides version of the related Event (e.g. 1/2/3/4)
"_aggregate_type" - This provides type of the Aggregate being stored, which can be customized
If our repository stores multiple Aggregates is useful to have the information about the type of Aggregate we are storing. However keeping the class name is not best idea, as simply refactor would break our Event Stream. Therefore Ecotone provides a way to mark our Aggregate type using Attribute
This now will be passed together with Events under _aggregate_type metadata.
In Ecotone we can name the events to avoid storing class names in the Event Stream, to do so we use NamedEvent.
then when events will be passed to save method, they will automatically provide this name under eventName property.
With custom repository we still can use inbuilt Snapshoting mechanism. To use it for customized repository we will use BaseEventSourcingConfiguration.
Ecotone then after fetching snapshot, will load events only from this given moment using `fromAggregateVersion`.
If you want to test out your flow and storing with your custom Event Sourced Repository, you should disable default in memory repository
Changes in the Application will happen. After some time we may want to refactor namespaces, change the name of Aggregate or an Event. However those kind of changes may break our system, if we already have production data which references to any of those. Therefore to make our Application to immune to future changes we need a way to decouple the code from the data in the storage, and this is what Ecotone provides.
Our Event Stream name by default is based on the Aggregate Class name. Therefore to make it immune to changes we may provide custom Stream Name. To do it we will apply Stream
attribute to the aggregate:
Then tell the projection to make use of it:
By default events in the stream will hold Aggregate Class name as AggregateType
.
You may customize this by applying AggregateType
attribute to your Aggregate.
To avoid storing class names of Events in the Event Store
we may mark them with name:
This way Ecotone will do the mapping before storing an Event and when retrieving the Event in order to deserialize it to correct class.
It's worth to remember that if we want test storing Events using provided Event Named, we need to add them under recognized classes, so Ecotone knows that should scan those classes for Attributes:
PHP Event Sourcing Snapshoting
In general having streams in need for snapshots may indicate that our model needs revisiting. We may cut the stream on some specific event and begin new one, like at the end of month from all the transactions we generate invoice and we start new stream for next month. However if cutting the stream off is not an option for any reason, we can use snapshots to avoid loading all events history for given Aggregate. Every given set of events snapshot will be taken, stored and retrieved on next calls, to fetch only events that happened after that.
EventSourcingConfiguration
provides the following interface to set up snapshots.
$aggregateClassToSnapshot
- class name of an aggregate you want Ecotone to save snapshots of
$thresholdTrigger
- amount of events for interval of taking a snapshot
$documentStore
- reference to document store which will be used for saving/retrieving snapshots
To set up snapshots we will define ServiceContext configuration.
Threshold states at which interval snapshots should be done. Therefore with below configuration:
snapshots will be done every 500 events. Then when snapshot will be loaded, it will start loading the events from event number 501 for given Aggregate instance.
All Events may contain additional Metadata. This is especially useful for storing information that are not required for Command to be handled, yet are important for auditing and projecting purposes.
In Ecotone any communication happens via Messages, and Messages contains of Payload and Headers (Metadata).
So far we've discussed only the Payload part, for example ProductWasCreated Event Class is actually an Payload.
What we actually store in the Event Stream is Message, so Payload and Metadata.
Ecotone Framework use the Metadata for Framework related details, which can be used for identifying messages, correlating, and targeting (which Aggregate it's related too). However we can also use the Metadata for additional information in our Application too.
Ecotone provides Metadata propagation, which take cares of passing Metadata between Command and Events without the need for us to do it manually. This way we can keep our business code clean, yet still be able to access the Metadata later.
Even so, the Metadata is not used in our Ticket Aggregate, when the Event will be stored in the Event Stream, it will be stored along side with our provided Metadata. Therefore we will be able to access it in any Event Handlers:
We can also manually add Metadata directly from Command Handler, by packing the our data into Event class.
and then access it from any subflows:
We may access metadata sent from Command Bus in Command Handler when needed:
This feature is available as part of Ecotone Enterprise.
We may also access Metadata inside our Event Sourcing Handler. This may be useful when we need to protect business invariants based on the data, that is stored as part of Metadata.
So let's assume UserCreated Event:
If we would want to change how the Event is serialized, we would define Converter
Then the Event Stream would look like above
This basically means we can serialize the Event in the any format we want.
Having customized Converters for specific Events, is also useful when we need to adjust some legacy Events to new format. We can hook into the deserialization process, and modify the payload to match new structure.
the UserName
would be a simple Class which contains of validation so the name is not empty:
Now if we would serialize it without telling JMS, how to handle this class we would end up with following JSON in the Event Stream:
Now this is fine for short-lived applications and testing, however in the long living application this may become a problem. The problem may come from changes, if we would simply change property name in UserName.value
to UserName.data
it would break deserialization of our previous Events.
As data
does not exists under name
key.
Therefore we want to keep take over the serialization of objects, to ensure stability along the time.
Now with above Converter, whenever we will use UserName
class, we will be actually serializing it to simple string type, and then when deserialize back from simple type to UserName class:
With this, with few lines of code we can ensure consistency across different Events, and keeping our Events bullet proof for code refactor and changes.
However in case of Event Sourced System we rather do not want to delete events, as this is critical operation which is considered dangerous. Deleting Events could affect running Projections, deleting too much may raise inconsistencies in the System, and in some cases we may actually want to drop only part of the data - not everything. Therefore dropping Events from Event Stream is not suitable solution and we need something different.
Solution that we can use, is to change the way we serialize the Event. We can hook into serialization process just as we did for normal serialization, and then customize the process. Converter in reality is an Service registered in Dependency Container, so we may inject anything we want there in order to modify the serialization process.
So let's assume that we want to encrypt UserCreated
Event:
So what we do here, is we hook into serialization/deserialization
process and pass the data to EncryptionService
. As you can see here, we don't store the payload here, we simply store an reference in form o a key.
EncryptionService can as simple as storing this data in database table using key as Primary Key, so we can fetch it easily. It can also be stored with encryption in some cryptographic service, yet it may also be stored as plain text. It all depends on our Domain.
However what is important is that we've provided the resource id to the EncryptionService
Now this could be used to delete related Event's data. When Customer comes to us and say, he wants his data deleted, we simply delete by resource:
That way this Data won't be available in the System anymore. Now we could just allow Converters fails, if those Events are meant to be deserialized, or we could check if given key exists and then return dummy data instead.
If we allow Converters to fail when Serialization happens, we should ensure that related Projections are using simple arrays instead of classes, and handle those cases during Projecting. If we decide to return dummy data, we can keep deserializing those Events for Projections, as they will be able to use them.
Describes how streams with events will be stored. Each Event Stream is separate Database Table, yet how those tables are created and what are constraints do they protect depends on the persistence strategy.
This is the basics Stream Strategy which involves no constraints. This means that we can append any Events to it without providing any additional metadata.
Now as this is free append involves no could re-run this code apply exactly the same Event. This can sounds silly, but it 's make it useful for particular cases. It make it super easy to append new Events. We basically could just add this action in our code and keep applying Events to the Event Stream, we don't need to know context of what happened before.
This is useful for scenarios where we just want to store information without putting any business logic around this. It could be used to continues stream of information like:
Temperature changes
Counting car passing by in traffic jam
Recording clicks and user views.
This the default persistence strategy. It does creates partitioning within Event Stream to ensure that we always maintain correct history within partition. This way we can be sure that each Event contains details on like Aggregate id it does relate to, on which version it was applied, to what Aggregate it references to.
The tricky part here is that we need to know Context in order to apply the Event, as besides the Aggregate Id, we need to provide Version. To know the version we need to be aware of last previous applied Event.
When this persistence strategy is used with Ecotone's Aggregate, Ecotone resolve metadata part on his own, therefore working with this Stream becomes easy. However when working directly with Event Store getting the context may involve extra work.
This Stream Strategy is great whenever business logic is involved that need to be protected. This solves for example the problem of concurrent access on the database level, as we for example can't store Event for same Aggregate Id and Version twice in the Event Stream. We would use it in most of business scenarios where knowing previous state in order to make the decision is needed, like:
Check if we can change Ticket based on status
Performing invocing from previous transactions
Decide if Order can be shipped
This is similar to Partition strategy, however each Partition is actually stored in separate Table, instead of Single One.
This can be used when amount of partitions is really low and volume of events within partition is huge.
You may provide your own Customer Persistence Strategy as long as it implements PersistenceStrategy
.
To set given persistence strategy as default, we can use ServiceContext:
Once set, the persistence strategy will apply to all streams in your application. However, you may face a situation when you need to have a different strategy for one or more of your streams.
The above will make the Simple Stream Strategy as default however, for some_stream
Event Store will use the Aggregate Stream Strategy.
Be aware that we won't be able to set Custom Strategy that way.
Ecotone use in order to convert Events into serializable form. This means we can customize process of serializing and deserializing specific Events, to adjust it to our Application.
When using support, we can even customize how we want to serialize given class, that is used within Events.
For example we could have User Created Event which make use of UserName
class.
In case of storing sensitive data, we may be forced by law to ensure that data should be forgotten (e.g. ). This basically means, if Customer will ask to us to remove his data, we will be obligated by law to ensure that this will happen.
PHP Event Streams
The Projection
is deriving from Event Stream.
There may be situations when we will want to derive the projection from multiple streams however.
Let's see what options do we have:
If we are interested in single stream, we can listen directly for specific aggregate
In here we are handling events from single Basket's Aggregate stream
. It will contain all the events in relation to this aggregate.
There may be situations, when we will want to handle different streams together.
In case if using Stream Per Aggregate Persistence Strategy
we will need to use categories to target.
If we would listen on Domain\Ticket
stream using Stream Per Aggregate
then we would not target any event, as the streams that are created are suffixed by the identifier Domain\Ticket-123
.
In that case we can make use of categories in order to target Domain\Ticket-*.
If you want to avoid storing class names of your events in the Event Store
you may mark them with name.
And tell the projection to make use of it
If projections are handling the events by names, then there is no need to deserialization of the event to the class and simple array can be used. In case of thousands of events during resetting the projection it will speed up the process.
Projections are about deriving current state from the stream of events. Projections can be added in any moment of the application lifetime and be built up from existing history, till the current time. This is powerful concept as it allow for building views quickly and discarding them without pain, when they are not longer needed.
This tells Ecotone
that specific class is Projection. The first parameter is the name of the projection
and the second is name of the stream (the default is the name of the Aggregate
) that this projection subscribes to.
Events that this projection subscribes to
Optional Query Handlers for this projection. They can be placed in different classes depending on preference.
Document Store is a great way to set up your projections. You can freely create DTO objects, or play with simple arrays and Ecotone will serialize/deserialize and store them for you.
PHP Event Sourcing Projections
Before diving into this topic, read the Event Sourcing Introduction first.
The power of Event Sourcing is not only the full history of what happened. As we do have a full history, it's easy to imagine that we may want to use it for different purposes. And one of our purposes will be related to view this data in specific way.
Let's take as an example our Ticket's Event Stream:
In most of the situations besides knowing the history, we would also want to know how all does Tickets looks at present moment. Therefore it means we need to build an view from those events which will represent current state, and for this we use Projections.
So we may be in need to build the list of all the tickets and their current status
For example this list can be stored in the database table with three columns:
id
type
status
The difference between the traditional approach and ES approach is that we will be delivering this view from the Event Stream. Therefore this will be only the representation build up from the past events, and will be used only for reading. Data delivered from Events to shape specific view, we call Read Models.
Ecotone provides abstraction to quickly build new Projections, it does follow Ecotone's declarative configuration. Before we will jump into implementation, let's quickly review how our Ticket Event Sourced Aggregate could look like:
So we do have two Events here, TicketWasRegistered and TicketWasClosed. We will be subscribing to those in order to build our new Ticket List Projection.
Let's first define our new Ticket List Projection
We do start by creating new class, which we mark with Projection attribute. The first argument is the name of our new projection "ticket_list", the second is the related ES Aggregate "Ticket" from which we will be subscribing Events. We will touch on the second argument more in next sections.
Ecotone will take care of creating the Projection for us, therefore we can tell it how to do it
Now we are ready to subscribe to Ticket related Events
This is enough for Ecotone to know that this should be triggered whenever TicketWasRegistered happens. As a result of triggering this Event Handler we store new ticket in the our database table
We also want to change the status, when ticket is closed, so let's add that now:
This is all to make our Projection work. There is no any additional configuration needed as we are working from higher level abstraction. We tell what events we want to have delivered, and Ecotone will take care of delivering and initializing and triggering our Projections.
By default this Projection will be triggered synchronously. This means that after Event Sourced Aggregate is called, Events will first be stored in the Event Stream, and then Projection will be called.
Our Projection subscribe to those Events, therefore it will be triggered as a result
By default projections work synchronously as part of the same process of Command Handler execution. This ensures that our Projection is always consistent with changes in the Event Stream, because it's wrapped in database transaction.
Synchronous projects are done within same transaction as the Command execution. This way Ecotone ensures consistency of the data by default. This behaviour is configurable.
This works well for scenarios when there are no much changes to happening to given instance of Event Sourced Aggregate. How Ecotone handles Concurrency was described in more details in previous section. What is important is that for low writes this solution will work perfectly, for high volume of writes on other hand we may want to trigger Projections asynchronously.
Synchronous projections are simpler in development, as we can immediately fetch the data from Read Model and be sure that is consistent with the changes. With Asynchronous data may be refreshed after some time, therefore if fetched immediately, we may get stale results.
Ecotone provides great abstraction for making the code asynchronous. From development perspective the code stays the same like synchronous, yet under the hood thanks to Messaging abstraction it can be easily switched to work asynchronously via Message Channels.
So to make the Projection asynchronous, the only thing which we need to do, is to mark it as asynchronous.
Ecotone will take care of delivering the triggering Event via given async channel to the Projection. This way we can start with synchronous projections, and when we will feel the need, simply switch them to Asynchronous without any single line of code being changed. You may read more about execution process in next section. We need to touch on one more important topic. Where do we actually get the data from for Projections.
Events that trigger Projections are not actually a source of the data for them. This is because if we would lose Event Message along the way due some failure (For example we don't use Outbox) or it would and in Dead Letter then we would basically skip over an Event.
Let's take as an example Asynchronous Projection, where we want to store Ticket with new "alert-warning" type. However let's suppose we've created column with limited size for type - which is up to 10 characters. Therefore our Projection will fail on storing that, because the type is 13 characters long:
Now if after that we will receive Ticket was closed event, then related Event Handler would update nothing, as there is no this Ticket stored in our Read Model:
So this is obviously not way of ensuring consistency in the system. Ecotone does it differently and treats the incoming Events just as "triggers". It works like information for the Projection to fetch the Events from the Event Stream and start Projecting.
This way if even so Ticket Was Registered failed, when Ticket was Closed would come after it would still get the original event first. So if we would fix the problem with column size, it would basically self-heal automatically.
Each Projection keep track of it's position. Therefore whenever new Event comes in, it knows from which point in Event Stream it should fetch the Events from.
There is one more important reason for building Projections from Event Stream, Projection Rebuilding.
Thanks to Projections ability to build the projection from the Event Stream, we are not bound by time. When we deploy new Projection, it will go over previous Events as part of the projecting process. This way we can ability to be able to ship new Projections at any point of time, yet with ability to use all the previous Events from the past.
Besides that we can rebuild existing Projection, as rebuilding is all about reseting the Projection's position, and start to fetch from scretch. You may read about available actions in Executing and Managing section.
PHP Event Sourcing Projections
By default Ecotone
runs the projections synchronously with your aggregate changes.
This kind of running configuration can be used to avoid eventual consistency problems or for testing purposes.
However when you expect multiple accesses to your Aggregates at the same time, you may consider asynchronous projection to protect yourself from concurrency problems.
This projections are running within same transaction as your Event Store changes. This will ensure atomic consistency between your aggregate and projection.
Then we can run:
The difference between Polling
and Event Driven
projection is the way they are triggered.
The Event Driven
is only triggered when new event comes to the system. This avoid the pitfall of continues database access while using Polling Projection
.
The second strength of Asynchronously Event Driven Projection is possibility of registering multiple projections under same channel (which is same consumer).
You may customize your Event Sourcing configuration with following configuration:
load_count //Default: null
Change load batch size in each run for single projection.
cache_size //Default: 1000
The cache size is how many stream names are cached in memory, the higher the number the less queries are executed and therefore the projection runs faster, but it consumes more memory.
sleep //Default: 100000
The sleep options tells the projection to sleep that many microseconds before querying the event store again when no events were found in the last trip. This reduces the number of cpu cycles without the projection doing any real work.
persist_block_size //Default: 1000
The persist block size tells the projector to persist its changes after a given number of operations. This increases the speed of the projection a lot. When you only persist every 1000 events compared to persist on every event, then 999 write operations are saved. The higher the number, the fewer write operations are made to your system, making the projections run faster. On the other side, in case of an error, you need to redo the last operations again. If you are publishing events to the outside world within a projection, you may think of a persist block size of 1 only.
lock_timeout_ms //Default: 1000
Indicates the time (in milliseconds) the projector is locked. During this time no other projector with the same name can be started. A running projector will update the lock timeout on every loop, except you configure an update lock threshold.
update_lock_threshold //Default: 0
If update lock threshold is set to a value greater than 0 the projection won't update lock timeout until number of milliseconds have passed. Let's say your projection has a sleep interval of 100 ms and a lock timeout of 1000 ms. By default the projector updates lock timeout after each run so basically every 100 ms the lock timeout is set to: now() + 1000 ms
This causes a lot of extra work for your database and in case the database is replicated this can cause a lot of network traffic, too.
gap_detection //Default: new \Prooph\EventStore\Pdo\Projection\GapDetection()
Gap Detection makes projection to wait for upcoming events if any gap occurs in your event stream. To disable Gap Detection you can set value to null
.
You may run your projection in the background. It will query the database within constant time intervals, to look if new events have been registered. Each projection is running as separate process. To register Polling Projection make use of .
You may pass your projections in event driven manner using .
You may configure your projection custom configuration. Take under consideration that some configuration may have sense only in case of .
PHP rebuild and delete projections
As projection can be restarted, deleted and created differently. When the projection knows how to setup it itself, it's easy to rebuild it when change is needed.
And inside the projection we need to implement ProjectionInitialization
to tell Ecotone
what to do:
In order to restart the projection in case we want to provide incompatible change, we can simply reset the projection and it will build up from the beginning.
And inside the projection we need to implement ProjectionReset
to tell Ecotone
what to do:
If we want to delete the projection
And inside the projection we need to implement ProjectionDelete
to tell Ecotone
what to do:
If we want to manually trigger projection
Self-healing the application (, , )
Ensuring Data Consistency (, , )
Recovering (, , )
To find out more about different use-cases, read related section about .
[Article]
[Article]
[Article]
[Article]
In order to optimize projections or to avoid using external storage, we may use of Projection's State
.
State is data that is kept between executions and can be passed to Projection's Event Handler.
In order to pass the state to Projection's Event Handlers we need to mark method parameter with #[ProjectionState]
.
Ecotone will resolve this parameter and pass the state. The returned state from the Event Handler will becomes new state for next execution. We may pass the state between all Event Handlers in given Projection.
The state can be simple array or a class. Whatever you pick, Ecotone will automatically serialize and deserialize it for you.
You may want to fetch the state from outside to return it to the end user.
In that case Ecotone brings ProjectionStateGateway
.
The first parameter of the attribute is the projection name, so Ecotone can know, which state it should look for. This Gateway will automatically convert the state to your defined return type.
Gateways are automatically registered in your Dependency Container, so you can fetch it like any other service.
Ecotone provides instant retries, which triggers automatically, if given Message failed and tries to recover immediately.
Instant retries may be useful in case of temporary issues, like optimistic locking, momentary unavailability of the external service which we want to call or database connection failure.
In order to set up instant retries for Command Bus, you Service Context configuration.
This will retry your synchronous Command Handlers
.
This will retry instantly when your message is handled asynchronously. This applies to Command and Events. Take under consideration that Ecotone isolates handling asynchronous events, so it's safe to retry them.
By using instant retries for asynchronous endpoints we keep message ordering.
Delayed retries are helpful in case, we can't recover instantly. This may happen for example due longer downtime of external service, which we integrate with. In those situations we may try to self-heal the application, by delaying failed Message for given period of time. This way we can retry the call to given service after some time, and if everything is fine, then we will successfully handle the message.
First Error Channel need to be set up for your Application, then you may configure retries.
If you want to use inbuilt Error Retry Strategy and set retry attempts, backoff strategy, initial delay etc, you may configure using ErrorHandlerConfiguration
from ServiceContext.
When we have consumer named "asynchronous_messages", then we can define PollingMetadata with customer error Channel.
There is crucial difference between Ecotone and other PHP Frameworks in a way it enables safe retries.
Let's consider asynchronous scenario, where we want send order confirmation and reserve products in Stock via HTTP call, when Order Was Placed. This could potentially look like this:
Now imagine that sending to Stock fails and we want to retry. If we would retry whole Event, we would retry "notifyAboutNewOrder" method, this would lead to sending an notification twice. It's easy to imagine scenarios where this could lead to even worse situations, where side effect could lead to double booking, trigger an second payment etc. That's is why Ecotone implements Safe Retries.
In Ecotone each of the Handlers will receive it's own copy of the Event and will handle it in full isolation.
This means that under the hood, there would be two messages sent to asynchronous_messages
each targeting specific Event Handler.
This bring safety to retrying events, as in case of failure, we will only retry the Handler that actually failed.
One of the drawback of Event Sourcing is eventual consistency. Whenever event happens we want to do two things, update our projection and inform the end user about change. However user need to be informed about the change after projection is refreshed, as otherwise he will get stale view. In order to solve this drawback Ecotone brings possibility for emitting events directly from projection. So instead of subscribing to Domain Events (Aggregate State Changed), end user may subscribe to change in the projection.
In order to emit the events, we are using EventStreamEmitter
.
Whenever we emit
given events, they are stored in Projection's stream.
After that you may subscribe to given events, just like to any other events.
All the events are stored in the streams, this means that in case of need we may create another projection that will subscribe to those events.
In some cases we may want to emit event to existing stream (for example to provide summary event), or to fresh new stream.
In order to do that we may use linkTo
method on EventStreamEmitter
.
LinkTo
works from any place in the code, however emit
as it stores in projection's stream works only inside projection.
When we rebuild the projection events could be republished and that would affect our end users, plus would link duplicated events to our stream. Luckily Ecotone will handle this scenario and will not republish or store any events that are emitted during reset phase.
This is the biggest difference between using EventBus
versus EventStreamEmitter
.
As EventBus would simple republish the events during rebuild phase.
When projection is deleted, Ecotone will automatically delete projection stream.
In case when custom stream is provided by they will not be automatically deleted.
Ecotone
comes with solution called Error Channel.
Error Channel is a place where unrecoverable Errors can go, this way we can preserve Error Messages even if we can't handle anyhow.
Error Channel may log those Messages, store them in database, push them to some Asynchronous Channel. The what is to be done is flexibile and can be adjusted to Application needs.
Error Channel can be configured per Message Consumer, or globally as default Error Channel for all Message Consumers:
- Symfony
- Laravel
- Lite
config/packages/ecotone.yaml
config/ecotone.php
To handle incoming Error Messages, we can bind to our defined Error Channel using ServiceActivator:
We can also use inbuilt retry mechanism, that will be resend Error Message to it's original Message Channel with delay. If our Default Error Channel is configured for name "errorChannel", then we can connect it like bellow:
If for some cases we want to discard Error Messages, we can set up error channel to default inbuilt one called "nullChannel". That may be used in combination of retries, if after given attempt Message is still not handled, then discard:
Ecotone comes with full support for managing full life cycle of a error message. Read more in next section.
Ecotone comes with full support for managing full life cycle of a error message by using Dbal Module.
Store failed Message with all details about the exception
Allow for reviewing error Messages
Allow for deleting and replaying error Message back to the Asynchronous Message Channels
Install Ecotone's Dbal Module.
Set up Error Channel like discussed at the beginning of the section
If we configure default error channel to point to "dbal_dead_letter" then all Error Messages will land there directly:
config/packages/ecotone.yaml
config/ecotone.php
We may also want to try to recover before we consider Message to be stored in Dead Letter:
config/packages/ecotone.yaml
config/ecotone.php
and then we use inbuilt Retry Strategy:
Get more details about existing commands
Listing current error messages
Get more details about given error message
Replay error message. It will return to previous channel for consumer to pick it up and handle again.
Replaying all the error messages.
Delete given error message
The above solution requires running Console Line Commands. If we want however, we can manage all our Error Messages from one place using Ecotone Pulse.
This is especially useful when we've multiple Applications, so we can go to single place and see if any Application have failed to process Message.
To ensure full level of data consistency, we may decide to store messages along side with data changes. This way we work only with single data storage, avoiding completely problem with persisting Message in two sources at once. To make it happen Ecotone implements so called Outbox pattern.
For critical parts of the systems we may decide to commit Messages to the same database as data changes using Outbox pattern.
In order to use Outbox pattern we need to set up Dbal Module.
By sending asynchronous messages via database, we are storing them together with data changes. This thanks to default transactions for Command Handlers, commits them together.
After this all your messages will be go through your database as a message channel.
With Ecotone's Outbox pattern we set up given Channel to run via Database. This means that we can target specific channels, that are crucial to run under outbox pattern. In other cases where data consistency is not so important to us, we may actually use Message Broker Channels directly and skip the Outbox. As an example, registering payments and payouts may an crucial action in our system, so we use it with Outbox pattern. However sending an "Welcome" notification may be just fine to run directly with Message Broker.
One of the challenges of implementing Outbox pattern is way to scale it. When we start consume a lot of messages, we may need to run more consumers in order to handle the load.
In case of Ecotone, you may safely scale your Messages Consumers that are consuming from your Dbal Message Channel
. Each message will be reserved for the time of being published, thanks to that no duplicates will be sent when we scale.
However we may actually want to avoid scaling our Dbal based Message Consumers to avoid increasing the load on the database.
For this situation Ecotone
allows to make use so called Combined Message Channels
.
In that case we would run Database Channel
only for the outbox
and for actual Message Handler
execution a different one.
This is powerful concept, as we may safely produce messages with outbox and yet be able to handle and scale via RabbitMQ
SQS
Redis
etc.
database_channel
is Dbal Message Channel
rabbit_channel
is our RabbitMQ Message Channel
Then we run one or few Message Consumers for outbox
and we scale Message Consumers for rabbit
.
If we want more convient way as we would like to apply combined message channels
on multiple Message Handlers, we may create an reference
.
And then we use reference
for our Message Handlers
.
Whenever we use more than one storage during single action, storing to first storage may end up with success, yet the second may not. This can happen when we store data in database and then send Messages to Message Broker. If failure happen it can be that we will send some Message to Broker, yet fail to store related data or vice versa. Ecotone provide you with tools to help solve this problem in order to make sending Messages to Message Broker resilient.
Ecotone by default enables Message Collector. Collector collect messages that are about to be send to asynchronous channels in order to send them just before the transaction is committed. This way it help avoids bellow pitfalls:
Message Collector is enabled by default. It works whenever messages are sent via Command Bus
or when message are consumed asynchronously
.
Let's consider example scenario: During order processing, we publish an OrderWasPlaced event, yet we fail to store Order in the database. This means we've published Message that is based on not existing data, which of course will create inconsistency in our system.
When Message Collector is enabled it provides much higher assurance that Messages will be send to Message Broker only when your flow have been successful.
Let's consider example scenario: During order processing, we may publish an OrderWasPlaced event, yet it when we publish it right away, this Message could be consumed and handled before Order is actually committed to the database. In such situations consumer will fail due to lack of data or may produce incorrect results.
Due to Message Collector we gracefully reduce chance of this happening.
In general sending Messages to external broker is composed of three stages:
Serialize Message Payload
Map and prepare Message Headers
Send Message to external Broker
In most of the frameworks those three steps are done together, which may create an issue. Let's consider example scenario: We send multiple Messages, the first one may with success and the second fail on serialization. Due to that transaction will be rolled back, yet we already produced the first Message, which becomes an Ghost Message. To avoid that Ecotone perform first two actions first, then collect all Messages and as a final step iterate over collected Messages and sent them. This way Ecotone ensures that all Messages must have valid serialization before we actually try to send any of them.
As Collector keeps the Messages in memory till the moment they are sent, in case of sending a lot of messages you may consider turning off Message Collector, to avoid memory consumption. This way Messages will be sent instantly to your Message Broker.
Whenever sending to Message Broker fails, Ecotone will retry in order to self-heal the application.
By default Ecotone will do 2
reties when sending to Message Channel fails:
- First after 10
ms
- Second after 400
ms.
You may configure sending retries per asynchronous channel:
After exhausting limit of retries in order to send the Message to the Broker, we know that we won't be able to do this. In this scenario instead of letting our action fail completely, we may decide to push it to Error Channel instead of original targetted channel.
We may decide for example to push it to Dead Letter to store it and later retry:
If you will push Error Messages to Dbal Dead Letter, then they will be stored in your database for later review. You may then delete or replay them after fixing the problem. This way we ensure consistency even if unrecoverable failure happened our system continues to have self-healed.
If you need customization per Message Consumer you may do it using PollableChannelConfiguration
by providing Message Consumer name:
For mission critical scenarios, you may consider using Ecotone's Outbox Pattern.
Concurrency exceptions when multiple processes or threads access and modify shared resources simultaneously. These exceptions happen because two or more operations conflict try to change same piece of data. Ecotone provides built-in support for concurrency handling.
In order to solve concurrent access, Ecotone implements Optimistic Locking.
Each Event Sourcing Aggregate or Event Sourcing Saga has a version property that represents the current version of the resource. When modifications are made, the version is incremented. If two concurrent processes attempt to modify the same resource with different versions, a concurrency exception is raised. This is default behaviour, if we are using inbuilt Event Sourcing support.
In case of Custom Repositories, we may use Ecotone support for optimistic locking to raise the exception in the Repository.
Version will be passed to the repository, based on #[AggregateVersion] property inside the Aggregate/Saga.
We don't need to deal with increasing those on each action. Ecotone will increase it in our Saga/Aggregate automatically. We may also use inbuilt trait to avoid adding property manually.
To handle concurrency exceptions and ensure the system can self-heal, Ecotone offers retry mechanisms.
In synchronous scenarios, like Command Handler being called via HTTP, instant retries can be used to recover. If a concurrency exception occurs, the Command Message will be retried immediately, minimizing any impact on the end user. This immediate retry ensures that the Message Handler can self-heal and continue processing without affecting the user experience. In asynchronous scenarios, you can use still use instant retries, yet you may also provide delayed retries. This means that when concurrency exception will occur, the Message will be retried after a certain delay. This as a result free the system resources from continues retries and allows for recovering after given period of delay.
The role of deduplication is to safely receive same message multiple times, as there is no guarantee from Message Brokers that we will receive the same Message once. In Ecotone all Messages are identifiable and contains of Message Id. Message Id is used for deduplication. If message was already handled, it will be skipped.
This is especially useful when, we receive events from external services e.g. payment or notification events which contains of identifier that we may use deduplication on. For example Sendgrid (Email Service) sending us notifications about user interaction, as there is no guarantee that we will receive same webhook once, we may use "eventId", to deduplicate in case.
paymentId
becomes our deduplication key. Whenever we will receive now Command with same value under paymentId
header, Ecotone will deduplicate that and skip execution of receivePayment method
.
Deduplication happen across given endpointId.
This means that if we would introduce another handler with same deduplication key, it will get it's own deduplication tracking.
As deduplication is tracked within given endpoint id, it means we can change the deduplication key safely without being in risk of receiving duplicates. If we would like to start tracking from fresh, it would be enough to change the endpointId.
To remove expired deduplication history which is kept in database table, Ecotone provides an console command:
By default Ecotone removes message id from deduplication storage after 7 days in batches of 1000. It can be customized in case of need:
It's important to keep removal batch size at small number. As deleting records may result in database index rebuild which will cause locking. Therefore small batch size will ensure our system can continue, while messages are being deleted in background.
As the deduplication is enabled by default, if you want to disable it then make use of DbalConfiguration.
This is where Ecotone Pulse kicks in, as instead of reviewing and replaying the message directly from the application's console, you may do it directly from the UI application. Besides you may connect multiple Ecotone's application to the Pulse Dashboard to have full overview of your whole system.
Ecotone Pulse provide way to control error messages for all your services from one place.
After this you may run docker image with Ecotone Pulse passing the configuration to your services and RabbitMQ connection.
Then run docker image with Ecotone Pulse passing environment variables:
In the dashboard you may check all the connected services. For quick overview, you will find amount of errors within given service there.
To review error messages go to specific service. From there you can review the error message, stacktrace and replay it or delete.
In order to use Deduplication, install .
Deduplication is enabled by default and works whenever message is consumed in .
You may also define given Message Handler for deduplication. This will use and deduplicated base on your customer header key. This allows in synchronous and asynchronous scenarios.
You may find demo implementation .
[Article]
[Article]
[Documentation]
[Article]
Whenever message fails during it will kept repeated till the moment it will succeed. However retry strategy with dead letter queue may be set up in order to retry message given amount of times and then move it to the storage for later review and manual retry.
Enable in your service and with .
Provide array of services with service name and .
DSN to your RabbitMQ instance, which services are connected with .
You may check demo application, where Symfony and Laravel services are connected to Ecotone pulse in .
We may extend Gateways functionality with asynchronocity. This way we can pass any Message via given Message Channel first.
Asynchronous Gateways are available as part of Ecotone Enterprise.
To make Gateway Asynchronous we will use Asynchronous Attribute, just like with Asynchronous Message Handlers. We may can extend any types of Gateways: Command/Event/Query Buses, Business Interfaces or custom Gateways.
To build for example a CommandBus which will send Messages over async channel, we will simply extend a CommandBus interface, and add our method.
then we all Commands that will be triggered via AsynchronousCommandBus will go over async channel.
It's enough to extend given CommandBus with custom interface to register new abstraction in Gateway in Dependency Container.
Having asynchronous CommandBus is especially useful, if given Message Handler is not meant be executed asynchronous by default.
then when using standard CommandBus above Command Handler will be executed synchronous, when using AsynchronousCommandBus it will be done asynchronously.
It's easy to build an Outbox pattern using this Asynchronous Gateways. Just make use of Dbal Message Channel to push Messages over Database Channel.
and then register dbal channel
Then whenever we will send Events within Command Handler (which is wrapped in transaction by default while using Dbal Module). Messages will be commited as part of same transaction.
It's good to know how Ecotone solves the problem of Message Handling Isolation, which is one of the key features that allows us to build Resilient Messaging Systems.
In Message-Based Systems we will have situation that as a result of given Event, we will want to trigger some actions. This can sending notification, but also calling an external Service or starting fully new separate flow etc.
However those actions may actually fail for various of reasons and depending on how Messaging is implemented, it may help us to recover from this safely, or trigger unexpected side effects that may harm the business.
Let's first consider typical implementation of Message Bus, where we send an Event Message which is consumed by more than one Message Handler (Subscriber / Event Handler).
After placing an Order we send Asynchronous Order Was Placed Event Message, which as as a result triggers all related Event Handlers. As we can easily imagine, one of those Event Handlers may fail. However this creates a problem, because it's not only the Event Handler that have failed will be retried, but all the Event Handlers connected to given Event Message.
This of course may produce unexpected side effects, like sending confirmation twice, or delivering goods to the Customers more than once. Idempotency may help here, but it's not always available or implemented correctly, therefore we may try to solve it on higher level code.
To solve it using higher level code we may introduce multiple Messages Queues having single Message Handler connected, or produce Command Messages from Event Handlers in order to provide isolation. However all of those solutions make infrastructure, configuration or application level code more complex. This is because we try to solve Message Handling Isolation in upper levels, instead of having it solved on the foundation level.
Ecotone solves Message Handling Isolation at the foundation level, by delivering a copy of a Message to each of the related Event Handler separately:
Whenever Event Message is sent, a copy of this Message will be delivered to each of the related Event Handlers. This as a result make each Handler consume the Message in complete isolation and enables safe retries.
Handling each Event Handler in complete isolation, creates environment where safe retries are possible, as only Event Handler that have failed will be retried. By solving this on the foundation, the higher level code can stay focused on business part of the system, not solving Message Handling Isolation problems.
There are of course more benefits that this solution enables:
Possibility to safely retry instantly and with delay
Delaying execution of given Event Handler instead of whole Message
Prioritizing execution of given Event Handler instead of whole Message
We may define Time to Live for Messages. This way if Message will not be handled within specific amount of time, it will be automatically discarded. This is useful in scenarios like sending notifications, where given notification like One Time Password may actually have meaning only for 5 minutes.
You may delay handling given asynchronous message by adding #[Delayed]
attribute.
To dynamically calculate expected TTL, we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate the delay and pass there our Command:
We may send an Message and tell Ecotone to delay it using deliveryDelay Message Header:
need to support this option to be used
In case of Ecotone we don't prioritize whole Message, but specific Message Handler. This helps in scenarios when we have multiple Event Handlers, and we would like to configure each of them differently. For example may we have a case, where we want to prioritized one notification over another.
In Ecotone the higher the priority, the quicker given Event Handler will be called.
In case we publish an Event, we may want have multiple subscribing Event Handlers. In some situations we may want given action to happen before the other. This may happen for example for example, when one Event Handler updates an data, which the other is using. Therefore we may need to ensure that the Handler modifying data will be called before the one that make use of it.
There may be cases when we will use Synchronous Projections and then we would like to use this data in Event Handler. In that case, if standard Event Handler would be called first, we would lack the data. Therefore by default when Standard Event Handler has same priority as Projection Event Handler, Projection will be called first.
This also works for Aggregates to be prioritized before Standard Event Handlers. Therefore the ordering when same priority level is used, is as follows:
Projection Event Handlers
Aggregate/Sagas Event Handlers
Standard Event Handlers
Ecotone does allow to prioritize an handling of given Message before another one. This way we can handle quicker Messages that have been published to Asynchronous Message Channel later. For example we may send a lot of different notifications, however when Customer asks for One-Time Password we want to deliver it immediately. Therefore for this scenario we would setup higher priority for Authentication Token notification handling, than for other Notifications.
You may prioritize handling given asynchronous message by adding #[Priority]
attribute.
We may send an Message and tell Ecotone to prioritize it using priority Message Header:
Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.
In order to run Command Handler asynchronously we need to mark it as Asynchronous.
The same way we define for Event Handlers:
We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to our Message Handlers.
The "orders" string is actually a name of our Message Channel. This way we reference to specific implementation which we would like to use. To provide specific implementation like for example Database Channel, we would use ServiceContext.
This is basically all we need to configure. Now database channel called orders will be used, whenever we will use Attribute with this name.
There are multiple different implementation which we can use:
At this moment following modules with pollable channels are available:
If you're choose Dbal Message Channel Ecotone
will use Outbox Pattern to atomically store your changes and published messages.
After setting up Pollable Channel we can run the endpoint:
You may set up running configuration for given consumer while running it.
handledMessageLimit
- Amount of messages to be handled before stopping consumer
executionTimeLimit
- How long consumer should run before stopping (milliseconds)
finishWhenNoMessages
- Consumers will be running as long as there will be messages to consume
memoryLimit
- How much memory can be consumed by before stopping consumer (Megabytes)
stopOnFailure
- Stop consumer in case of exception
Using Service Context configuration for statically configuration.
Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.
You may put Asynchronous
on the class, level so all the endpoints within a class will becomes asynchronous.
All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint
If you want to intercept all polling endpoints you should make use of annotation related point cut on this.
Each Asynchronous Message Handler requires us to define "endpointId". It's unique identifier of your Message Handler.
Endpoint Id goes in form of Headers to your Message Queue. After Message is consumed from the Queue, Message will be directed to your Message Handler having given endpoint Id. This decouples the Message completely from the Message Handler Class and Method and Command/Event Class.
EndpointId ensures we can freely refactor our code and it will be backward compatible with Messages in the Queues. This means we can move the method and class to different namespaces, change the Command class names and as long as endpointId is kept the same Message will be delivered correctly.
Scheduling PHP
Ecotone
comes with support for running period tasks
or cron jobs.
endpointId
- it's name which identifies process to run
poller
- Configuration how to execute this method read more in next section.
Above configuration tells Ecotone
to execute this method every second.
After setting up Scheduled endpoint we can run the endpoint:
You can run Scheduled
for given Handler.
Right now method return Message which is send to given routing.
requestChannelName
- The channel name to which Message should be send.
When the Message will arrive on the Command Handler it will be automatically converted to ExchangeCommand.
If you want to understand how the conversion works, you may read about it in Conversion section.
You may find demo implementation here.
In case of Ecotone we don't delay whole Message, but specific Message Handler. This helps in scenarios when we have multiple Event Handler and we would like to configure the delay differently. For example may we have a case, where as a result of Order being placed, we would want to delay notification, yet to call Payment Service right away.
You may delay handling given asynchronous message by adding #[Delayed]
attribute.
To dynamically calculate expected delay, we can use expression language.
payload variable in expression language will hold Command/Event object. headers variable will hold all related Mesage Headers.
We could also access any object from our Dependency Container, in order to calculate the delay and pass there our Command:
We may send an Message and tell Ecotone to delay it using deliveryDelay Message Header:
If Message Delay would be send for Event. Then all subscribing Event Handlers would be delayed. For customizing it on the single Handler level, use Message Handler delay.
We may also delay to given date time:
In order for Ecotone
how to route messages you need to register Service Name (Application Name).
The minimum needed for enabling Distributed Bus with Service Map and start consuming is to tell Ecotone, that we do use Service Map within the Service
and then we would define Message Channel, which we would use for for incoming messages:
Register Distributed Bus with given Service Map:
and define implementation of the distributed Message Channel:
For concrete use case, read Main Section or Custom Features section.
Distribution Bus is Message Gateway just like CommandBus or EventBus. It creates smooth and elegant way for explicit communication between Applications (Services) without introducing any hassle and requirements for doing configurations, bindings and mappings. It make it easy for Developers to build integrations and maintain them in the long-term.
Read more in given Module section.
To find out more, read section related to specific implementation of Distributed Bus:
Distributed Bus with Service Map - Works with (RabbitMQ, Amazon SQS, Redis, Dbal, Kafka, Symfony Message Channels, Laravel Queues)
RabbitMQ Distributed Bus - Works with RabbitMQ only
This chapter provides more details about advanced Message Channel functionalities using Dynamic Message Channels. Dynamic Channels can be used to:
Simplify deployment strategy
Optimize system resources
Adjust message consumption or sending process, which is especially useful in SaaS and Multi-Tenant Environments
Dynamic Message Channels are available as part of Ecotone Enterprise.
The default strategy is to have single Message Consumer (Worker process) per Message Channel (Queue). When the volume of Messages is low however, some Consumers may actually be continuously in idle state. In order to not be wasteful about system resources we may want then to join the consumption, so single Message Consumer will poll from multiple Message Channels.
Suppose we do have Message Channels:
To prepare an Message Consumer that will be able to consume from those two Channels in Round Robin manner (meaning each consumption is called after another one) we can set up Dynamic Message Channel.
Dynamic Message Channels can combine multiple channels, so we can treat them as a one.
After that we can consume using "orders_and_notifications" name. We then can run the endpoint:
We can combine as many channels as we want under single Dynamic Channel.
There may be situations when we would like to introduce Message Channel per Client. This if often a case in Multi-Tenant environments when premium Customer does get higher consumption rates. In Ecotone we can keep our code agnostic of Multiple Channels, and yet provide this ability to end users in a simple way. For this we will be using Header Based Strategy.
Taking as an example Order Process:
This code is fully agnostic to the details of Multi-Tenant environment. It does use Message Channel "orders" to process the Command. We can however make the "orders" an Dynamic Channel, which will actually distribute to multiple Channels. To do this we will introduce distribution based on the Metadata from the Command.
Now whenever this Command is sent with tenant metadata, Ecotone will decide to which Message Channel it should proxy the Message.
Above will work exactly the same for Events.
Then we would simply run those as separate Message Consumption processes (Workers):
We may want to introduce separate Message Channels for premium Tenants and just have a shared Message Channel for the rest. For this we would use default channel:
The default channel will be used, when no mapping will be found:
Running Dynamic Channels does not differ from normal channels.
If we will run "orders", which is Dynamic Channel combined of three other Channels, Ecotone will run Message Consumption process which will use round-robin strategy to consume from each of them:
Typically we would also run consumption process for this specific channels, which require extra processing power. This Message Consumer will focus only on Messages within that Channel.
When shared_consumer and tenant_abc will read from same Message Channel at the same time, it will work as Competitive Consumer pattern. Therefore each will get his own unique Message.
By default whatever Message Channels we will define, we will be able to start Message Consumer for it. However if given set of Channels is only meant to be used under Dynamic Channel, we can make it explicit and avoid allowing them to be run separately.
To do so we use Internal Channels which will be available only for the Dynamic Channel visibility.
Internal Channels are only visible for the Dynamic Channel, therefore they can't be used for Asynchronous Message Handlers. What should be used for Async Handlers is the name of Dynamic Message Channel.
Let's take as an example of Multi-Tenant environment where each of our Clients has set limit of 5 orders to be processed within 24 hours. This limit is known to the Client and he may buy extra processing unit to increase his daily capacity.
Often used solution to skip processing is to reschedule Messages with a delay and check after some time if Client's message can now be processed. This solution however will waste resources, as we consume Messages that are not meant to be handled. Therefore Ecotone provides alternative, which skips the consumption completely, so we can avoid wasting resources on polling or rescheduling Messages, as we simply don't consume them at all.
To skip the consumption we will use Skipping Strategy. We will start by defining Message Channel per Client, so we can skip consumption from given Channel when Client have reached the limit.
The "decide_for_client" is the name of our Internal Message Handler that will do the decision.
This function will run in round-robin manner for each defined Message Channel (client_a, client-b).
By using Throttling Strategy we can easily rate limit our Clients. This can work dynamically, if Customer will buy credit credits, we can start returning true from decisioning method, which will kick-off the consumption. This means that we create real-time experience for Customers.
In some scenarios we may actually want to take full control over sending and receiving. In this situations we can make use of custom Strategies that completely replaces the inbuilt ones. This way we can tell which Message Channel we would like to send Message too, and from which Channel we would like to receive Message from.
To roll out custom receiving strategy we will use "withCustomReceivingStrategy":
To set up our Custom Strategy, we will use Internal Handler.
If we want to stop Consumption completely we can return "nullChannel" string. This will skip consuming given Channel. This may be useful in order to turn of given Message Consumer at run time.
To roll out custom receiving strategy we will use "withCustomReceivingStrategy":
To set up our Custom Strategy, we will use Internal Handler.
If we would like to discard given Message, we can return "nullChannel" string.
Implementing Microservices and Event Driven Architecture in PHP
Ecotone comes with Support for integrating Service (Applications) together in decoupled way, for this Ecotone provides Distributed Bus
.
Read more in bellow sections:
Simple demo using Ecotone Lite.
Advanced demo using Ecotone Lite.
Symfony and Laravel application integration.
Starting with Microservices in PHP [Article]
Loosely coupled Microservices in PHP [Article]
Ecotone comes with Message Channel abstraction which allows for easily switching from different Providers like Amazon SQS, RabbitiMQ, Redis, Kafka and more. This abstraction is used for Service (application) level asynchronous communication like Asynchronous Message Handlers. However this abstraction can also be combined with Distributed Bus mechanism to enable cross Service Communication using Service Map.
This functionality is available as part of Ecotone Enterprise.
It happens that communication between Services (Applications) is built using different Message Broker features to set up the topology. Which may require per feature configuration and provisioning, and in-depth knowledge about the Message Broker. This often end up as really complex solution, which becomes hard to understand and follow for Developers. When things becomes hard to understand, they become hard to change, as it raises the risk that potential modification may break something. As a result people try to avoid doing changes and development slows down. Therefore there is a need for different approach which keeps the things simple, easy to understand and change. Changes to the integration should not be scary, they should be straight forward and testable, so Developers can feel confidence in doing so. The best solution does not only make things simple to change, but also make things explicit, so just by looking people get more knowledge about the overal system design. And for this Ecotone comes with approach for Service to Service integration based on Service Map.
Service Map is a map of integrated Services (Applications), and points to specific Message Channels to which Messages for given Service should be sent:
In this approach Message Channels (Pipes) are simple transport layer, and the routing is done on the Application (Endpoint) level using Service Map to make the decision.
Making Service available for integration is matter of adding it to the Service Map:
and defining implementation of the Message Channel:
We may choose any Message Channel Provider we want, or even different providers depending on the integrated Service. This opens possibilities for using right tool for right job, as for example in one integration we could use Redis and for other one RabbitMQ or Kafka.
Having the routing map on the Application level instead of Message Broker level means we avoid vendor-lock. In case of need to switch to different Message Broker Provider, we can simply change Message Channel implementation and our integration will continue to work.
Approach of treating Message Brokers as simple transport layer and doing the routing on the Application level to send to the right Message Channel follows smart endpoint dump pipes approach. This as a result make System easy to reason about and understand. Every developer can simply take look on the Map to understand where the Message will land. Adding new integration does not require specific Message Broker knowledge, as it all comes up to adding an Service to the Map and defining the Message Channel provider. Therefore Developers can add easily maintain and change such integration.
As we understand the concepts behind Distributed Bus with Service Map now, let's dive into practical example.
Let’s suppose User Service wants to create Ticket by sending Command to Ticket Service. In Ticket Service we will explicitly state that we allow given Command Handler to be executed in Distributed way. This makes it clear for everyone that we can't simply delete this Command Handler, as other Services may rely on this integration:
on the side of User Service we would call Distributed Bus for Command to do so:
Our topology will look like this:
As we can see above, we do have our two Services "UserService" and "TicketService". Ticket Service will be consuming incoming messages from "distributed_ticket_service" Messsage Channel. Therefore when we send Messages to ticketService we need to send it to that Message Channel, this will be done automatically by DistributedBus using Service Map.
In User Service let's then define Service Map using ServiceContext configuration:
Now when we will send Command to ticketService it will land in distributed_ticket_service channel.
It's important to understand two level routing which will happen. Let's zoom in how do we call our Distributed Bus:
targetServiceName will be used to target specific Service therefore it will make use Message Channel defined in the Service Map. When Message will land in given Service, it will then use routingKey to target specific Command Handler within the Application.
As you can under the hood before DistributedCommandHandler is executed we actually have so called Distributed Handler. This Handler triggering Command/Event Bus with given routing key, and a result our Distributed Command Handler is executed.
Event distribution is a bit different from Command distribution. In case of Command we do have single Service that will receive the Message, in case of Events however there may be multiple of them. Let's expand our previous example to include Order Service, and our scenario is that whenever new User is registered in User Service, we will publish this event to both Ticket and Order Services.
On the consumption part we will be marking our Event Handlers with Distributed:
That code would exists in both Order and Ticket Service. On the publishing side, we will be using publish event method with Distributed Bus:
Like you can see there is no targetServiceName in the parameters anymore (comparing to distributing Command), this is because Event may land in more than one Service. However we keep routingKey as this the name to which consuming Services subscribe (look EventHandler attribute parameter).
By default Event will be published to all Services in the Service Map, with exception of originating Service that publish this Event, this one will be skipped (to avoid publishing to itself). Therefore the default behaviour broadcast the Event to all Services defined in Service Map.
It's a good practice to share the Service Map between Services. In order to have one single source of truth for your Service (Context) Mapping. This can also serve as reference for Developers to understand bigger picture of the System.
In case given Service is not interested in specific Event, it will simply ignore it. Therefore default publishing can really speed up of development process, and make things clear and simple. However with larger volume of published Events, there may be a lot of them, which will be simply ignored. In that situation we may want to optimize publishing part by using filtered publishing.
Filtered publishing allows for optimalization in publishing. This way we can publish Events only to the Services that are actually interested in those.
To configure map with subscription keys, we will be using subscriptionRoutingKeys parameter in Service Map configuration:
Subscription routing keys is array, therefore we may put multiple subscribition routing keys if needed. Subscrption keys can point to exact event name: "userService.address.changed" Or they may use wild card: "userService.account.*"
By default subscriptionRoutingKeys are null, which means given Service will receive all Events. If we will provide empty array, it means that subscription keys are enabled, yet none are matching, therefore no Events will be send to given Service.
When Service Map is defined as separate shared library. It becomes explicit what Events is given Service interested in. This also makes the process of subscribing to new Event visible for everyone, therefore we avoid hidden coupling that could lead to broken integration.
Integrating with other Languages and Frameworks
When we can install Ecotone in all of our Services (Applications), it's enough to share Service Map between them to start communicating, therefore integration is really quick and smooth. However there may be cases where installing Ecotone in all Services will not possible, for example we do have some legacy Service working on PHP version lower than 8, or we use different programming languages within our Stack.
On the consumption side of non-Ecotone application, we need to understand the context of the Message. So what type of Message we are dealing with, what should actually be executed, what is the content type of Message's payload.
So with each Distributed Message, Ecotone delivers set of headers, which can be used for answering above questions: ecotone.distributed.sourceServiceName: Name of the Service sending the Message
ecotone.distributed.targetServiceName: Name of target Service to which Message is sent
ecotone.distributed.payloadType: Type of Message - command/event/message
ecotone.distributed.routingKey: Routing key to the Handler which should executed
contentType: - Will hold content type of Message's payload (e.g. application/json)
All headers can be found in Ecotone\Modelling\Api\Distribution\DistributedBusHeader
Therefore using Message headers we can find out, what should we execute and how. And in the payload of the Message, we will find serialized Event / Command.
On other side we may want to send Message to be consumed by Ecotone's Distributed Handler. This can also be done by sending Message with correct headers, so Ecotone can understand it and execute specific Handler.
So Message Headers that's need to be delivered are:
ecotone.distributed.payloadType: Type of Message - command/event/message
ecotone.distributed.routingKey: Routing key to the Handler which should executed
contentType: - Will hold content type of Message's payload (e.g. application/json)
routingSlip: This is special header which indicate that we want to execute Distributed Handlers. It's value must be set to "ecotone.distributed.invoke".
Payload of the Message will contain serialized Event or Command, in format given in contentType header. This is enough for Ecotone to resolve and handle given Message.
Distributed Bus with Service Map comes with ability to test using real or in memory Message Channels. We can test run the tests using Ecotone Lite test support.
Suppose we do have two different Services User Service and Ticket Service. From User Service we would like to send Command to create new ticket, whenever something interesting happens.
Therefore using Ecotone Lite we can roll out two different applications and share the channel to test the full flow. Let's start by defining Publishing Service (Application):
When we do have Publishing Service, we can now set up Consumer side. Suppose we would like to test such Distributed Command Handler:
So let's define the Distributed Consumer
As we do have Publishing and Consuming side ready, we can now send Command from User Service.
This as a result will send this Command Message to Distributed Message Channel from which we can consume and verify, if our Ticket was stored correctly:
We may also test out using real production Message Channels, it's simple as switching to different provider:
We can also test serialization using In Memory Message Channels.
We may send Message via Distributed Bus together with other actions, most likely database changing ones. For those scenarios, we may instruct Distributed Bus to first persist Message in given Database Message Channel and then distribute it to external Service.
After that Asynchronous Channel will be the first one to which Message will be send, therefore enabling ability to commit all changes within same transaction.
In case we would like to explicitly separate Service Maps for specific integrations, for example having Service Map for internal Services (which given Team owns), and separate Service Map for cross Team integrations, then we can register more than one Distributed Bus:
referenceName will be the name at which DistributedBus for given Service Map will be registered in Dependency Container. Using that name we will be able to inject that into our Application level classes.
When we register Message Channel it becomes available for consumption in given Service. This means that we may register Message Channel, which we actually do not own.
So from perspective of the Service which just publish to this Message Channel, it does not own and should not consume from it. To ensure no consumption happen, and that this Message Consumer will not be available under ecotone:list command, we can use of Dynamic Message Channel capability.
Now this Message Channel will be able to be used only sending, and won't be even visible for list of Message Consumers to run:
Distributed Bus using Map allows for full customization of how we want to distribute Messages. This way we can for example decide, we would like to distribute Events and Commands separately, or send given Event to some custom Message Channel separately from the rest.
The above configuration use special header which Ecotone adds, when sends Message via Distributed Bus: ecotone.distributed.payloadType. This header hold the type of used Message, whatever it's command or event. Therefore we can make use of it, to route the Message to different Channels. We use Internal Channels here, as those will be only visible for this Dynamic Message Channel.
Whatever we send Command or Event we may pass alongside with it Metadata. Metadata can be any additional information, that brings context yet is not part of the Command or Event itself (e.g. Executor Id, Occurred at time).
then we access metadata on the Distributed Handler
Intercepting Distributed Buses can be useful for example for adding extra metadata or ensuring given set of headers are filtered out.
To configure this up, we need to have our registered and then we can instruct Distributed Bus to make use of it:
When we register our Message Channel we will wrap it with with send only strategy:
Whatever context of our Applications needs, we can customize using Distributed Map with .
As we have created DynamicMessageChannel with createNoStrategy, and provided sending strategy in form of withHeaderSendingStrategy. This means that no receiving strategy was given, therefore this Message Consumer will be hidden from the Message Consumer list (works like ).
To intercept Distributed Bus before Message is send we can use .