Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Message based communication
  • Application level code
  • Command Handlers
  • Event Handlers
  • Command Handlers with Routing
  • Interceptors
  • Message Metadata
  • Asynchronous Processing
  • Aggregates
  • Workflows
  • Inbuilt Resiliency
  • Business Oriented Architecture
  • Materials
  • Links

Was this helpful?

Export as PDF
  1. Modelling

Introduction

Message Driven System with Domain Driven Design principles in PHP

PreviousEnterpriseNextMessage Bus and CQRS

Last updated 2 months ago

Was this helpful?

The foundation idea

The roots of Object Oriented Programming were mainly about communication using Messages and logic encapsulation. The aim was to focus on the flows and communication, not on the objects itself. Objects were meant to be encapsulating logic, and expose clear interfaces of what they do, and what have they done.

If you know things like Events, Commands and Aggregates, then what was written above should feel familiar to you. This is because those concepts are build around same principles of old OOP where communication is done through Messages and Objects are meant to encapsulate logic. And Ecotone is about returning to those roots of Object Oriented Programming. It's about explicit System design where communication happen through Messages, in a way that is clear to follow and understand.

Message based communication

There is no possibility to immerse fully into Message based communication, as long as the foundation is not fully Message Driven. This means that each communication within the Application (not only between Applications) has to happen through Messages. This way it can become natural practice of how the system is being designed.

Ecotone follows on this making the Messaging the core of the Framework. It introduce Message based communication build around as the underlying foundation. This way even communication between PHP Objects can be done through Messages in seamless way.

As Ecotone follows Enterprise Integration Patterns, it makes the communication between Objects happening through Message Channels. We can think of Message Channel as pipe, where one side send Messages into, and the other consumes from it. As communication goes through Message Channels, it becomes really easy to switch the pipes. This basically means we can easily switch Message Channel implementations to use synchronous or asynchronous Channel, different Message Brokers, and yet our Objects will not be affected by that anyhow.

Application level code

Ecotone provides different levels of abstractions, which we can choose to use from. Each abstraction is described in more details in related sections. In this Introduction section we will go over high level on how things can be used, to show what is Message based communication about.

Command Handlers

Let's discuss our example from the above screenshot, where we want to register User and trigger Notification Sender. In Ecotone flows, we would introduce Command Handler being responsible for user registration:

class UserService
{
    #[CommandHandler]
    public function register(RegisterUser $command, EventBus $eventBus): void
    {
        // store user
        
        $eventBus->publish(new UserWasRegistered($userId));
    |
}

As you can see, we also inject Event Bus which will publish our Event Message of User Was Registered.

Command and Events are the sole of higher level Messaging. On the low level everything is Message, yet each Message can either be understood as Command (intention to do), or Event (fact that happened). This make the clear distinction between - what we want to happen vs what actually had happened.

In our Controller we would inject Command Bus to send the Command Message:

public function registerAction(Request $request, CommandBus $commandBus): Response
{
    $command = // construct command
    $commandBus->send($command);
}

After sending Command Message, our Command Handler will be executed. Command and Event Bus are available in our Dependency Container after Ecotone installation out of the box.

What is important here is that, Ecotone never forces us to implement or extend Framework specific classes. This means that our Command or Event Messages are POPO (clean PHP objects). In most of the scenarios we will simply mark given method with Attribute and Ecotone will glue the things for us.

Event Handlers

We mentioned Notification Sender to be executed when User Was Registered Event happens. For this we follow same convention of using Attributes:

class NotificationSender
{
    #[EventHandler]
    public function when(UserWasRegistered $event): void
    {
        // send notification
    }
}

This Event Handler will be automatically triggered when related Event will be published. This way we can easily build decoupled flows, which hook in into existing business events.

Even so Commands and Events are Messages at the fundamental level, Ecotone distinguish them because they carry different semantics. By design Commands can only have single related Command Handler, yet Events can have multiple subscribing Event Handlers. This makes it easy for Developers to reason about the system and making it much easier to follow, as the difference between Messages is built in into the architecture itself.

Command Handlers with Routing

From here we could decide to make use Message routing functionality to decouple Controllers from constructing Command Messages.

#[CommandHandler(routingKey: "user.register")]
public function register(RegisterUser $command, EventBus $eventBus): void

with this in mind, we can now user CommandBus with routing and even let Ecotone deserialize the Command, so our Controller does not even need to be aware of transformations:

public function registerAction(Request $request, CommandBus $commandBus): Response
{
    $commandBus->sendWithRouting(
        routingKey: "user.register", 
        command: $request->getContent(), 
        commandMediaType: "application/json"
    );
}

When controllers simply pass through incoming data to Command Bus via routing, there is not much logic left in controllers. We could even have single controller, if we would be able to get routing key. It's really up to us, what work best in context of our system.

Interceptors

What we could decide to do is to add so called Interceptors (middlewares) to our Command Bus to add additional data or perform validation or access checks.

#[Before(pointcut: CommandBus::class)]
public function validateAccess(
    RegisterUser $command,
    #[Reference] AuthorizationService $authorizationService
): void
{
    if (!$authorizationService->isAdmin) {
        throw new AccessDenied();
    }
}

Pointcut provides the point which this interceptor should trigger on. In above scenario it will trigger when Command Bus is triggered before Message is send to given Command Handler. The reference attribute stays that given parameter is Service from Dependency Container and Ecotone should inject it.

There are multiple different interceptors that can hook at different moments of the flow. We could hook before Message is sent to Asynchronous Channel, or before executing Message Handler. We could also state that we want to hook for all Command Handlers or Event Handlers. And in each step we can decide what we want to do, like modify the messages, stop the flow, enforce security checks.

Message Metadata

When we send Command using Command Bus, Ecotone under the hood construct a Message. Message contains of two things - payload and metadata. Payload is our Command and metadata is any additional information we would like to carry.

public function registerAction(Request $request, CommandBus $commandBus): Response
{
    $commandBus->sendWithRouting(
        routingKey: "user.register", 
        command: $request->getContent(), 
        commandMediaType: "application/json",
        metadata: [
            "executorId" => $this->currentUser()->getId()
        ]
    );
}

Metadata can be easily then accessed from our Command Handler or Interceptors

#[CommandHandler(routingKey: "user.register")]
public function register(
    RegisterUser $command, 
    EventBus $eventBus,
    #[Header("executorId")] string $executorId,
): void

Besides metadata that we do provide, Ecotone provides additional metadata that we can use whenever needed, like Message Id, Correlation Id, Timestamp etc.

Ecotone take care of automatic Metadata propagation, no matter if execution synchronous or asynchronous. Therefore we can easily access any given metadata in targeted Message Handler, and also in any sub-flows like Event Handlers. This make it really easy to carry any additional information, which can not only be used in first executed Message Handler, but also in any flow triggered as a result of that.

Asynchronous Processing

As we mentioned at the beginning of this introduction, communication happen through Message Channels, and thanks to that it's really easy to switch code from synchronous to asynchronous execution. For that we would simply state that given Message Handler should be executed asynchronously:

#[Asynchronous("async")]
#[EventHandler]
public function when(UserWasRegistered $event): void

Now before this Event Handler will be executed, it will land in Asynchronous Message Channel named "async" first, and from there it will be consumed asynchronously by Message Consumer (Worker process).

Aggregates

If we are using Eloquent, Doctrine ORM, or Models with custom storage implementation, we could push our implementation even further and send the Command directly to our Model.

#[Aggregate]
class User
{
    use WithEvents;

    #[Identifier]
    private UserId     $userId;
    private UserStatus $status;

    #[CommandHandler(routingKey: "user.register")]
    public static function register(RegisterUser $command): self
    {
        $user = //create user
        $user->recordThat(new UserWasRegistered($userId));
        
        return $user;
    }
    
    #[CommandHandler(routingKey: "user.block")]
    public function block(): void
    {
        $this->status = UserStatus::blocked;
    }
}

We are marking our model as Aggregate, this is concept from Domain Driven Design, which describe a model that encapsulates the business logic.

Ecotone will take care of loading the Aggregate and storing them after the method is called. Therefore all we need to do it to send an Command.

Like you can see, we also added "block()" method, which will block given user. Yet it does not hold any Command as parameter. In this scenario we don't even need Command Message, because the logic is encapsulated inside nicely, and passing a status from outside could actually allow for bugs (e.g. passing UserStatus::active). Therefore all we want to know is that there is intention to block the user, the rest happens within the method.

To execute our block method we would call Command Bus this way:

public function blockAction(Request $request, CommandBus $commandBus): Response
{
    $commandBus->sendWithRouting(
        routingKey: "user.block", 
        metadata: [
            "aggregate.id" => $request->get('userId'),
        ]
    );
}

There is one special metadata here, which is "aggregate.id", this tell Ecotone the instance of User which it should fetch from storage and execute this method on. There is no need to create Command Class at all, because there is no data we need to pass there. This way we can build features with ease and protect internal state of our Models, so they are not modified in incorrect way.

Workflows

One of the powers that Message Driven Architecture brings is ability to build most sophisticated workflows with ease. This is possible thanks, because each Message Handler is considered as Endpoint being able to connect to input and output Channels. This is often referenced as pipe and filters architecture, but in general this is characteristic of true message-driven systems.

Let's suppose that our registered user can apply for credit card, and for this we need to pass his application through series of steps to verify if it's safe to issue credit card for him:

class CreditCardApplicationProcess
{
    #[CommandHandler(
        routingKey: "apply_for_card",
        outputChannelName: "application.verify_identity"
    )]
    public function apply(CardApplication $application): CardApplication
    {
        // store card application
        
        return $application;
    }
}

We are using outputChannelName here to indicate where to pass Message after it's handled by our Command Handler. In here we could enrich our CardApplication with some additional data, or create new object. However it's fully fine to pass same object to next step, if there was no need to modify it.

Ecotone provides ability to pass same object between workflow steps. This simplify the flow a lot, as we are not in need to create custom objects just for the framework needs, therefore we stick what is actually needed from business perspective.

Let's define now location where our Message will land after:

#[Asynchronous("async")]
#[InternalHandler(
    inputChannelName: "application.verify_identity",
    outputChannelName: "application.send_result"
)]
public function verifyIdentity(CardApplication $application): ApplicationResult
{
    // do the verification
    
    return new ApplicationResult($result);
}

We are using here InternalHandler, internal handlers are not connected to any Command or Event Buses, therefore we can use them as part of the workflow steps, which we don't want to expose outside.

It's really up to us whatever we want to define Message Handlers in separate classes or not. In general due to declarative configuration in form of Attributes, we could define the whole flow within single class, e.g. "CardApplicationProcess". Workflows can also be started from Command or Event Handlers, and also directly through Business Interfaces. This makes it easy to build and connect different flows, and even reuse steps when needed.

Our Internal Handler contains of inputChannelName which points to the same channel as our Command Handlers outputChannelName. This way we bind Message Handlers together to create workflows. As you can see we also added Asynchronous attribute, as process of identity verification can take a bit of time, we would like it to happen in background.

Let's define our last step in Workflow:

#[InternalHandler(
    inputChannelName: "application.send_result"
)]
public function sendResult(ApplicationResult $application): void
{
    // send result
}

This we've made synchronous which is the default if no Asynchronous attribute is defined. Therefore it will be called directly after Identity verification.

Workflows in Ecotone are fully under our control defined in PHP. There is no need to use 3rd party, or to define the flows within XMLs or YAMLs. This makes it really maintainable solution, which we can change, modify and test them easily, as we are fully on the ownership of the process from within the code. It's worth to mention that workflows are in general stateless as they pass Messages from one pipe to another. However if we would want to introduce statefull Workflow we could do that using Ecotone's Sagas.

Inbuilt Resiliency

Ecotone handles failures at the architecture level to make Application clear of those concerns. As Messages are the main component of communication between Applications, Modules or even Classes in Ecotone, it creates space for recoverability in all parts of the Application. As Messages can be retried instantly or with delay without blocking other processes from continuing their work.

As Message are basically data records which carry the intention, it opens possibility to store that "intention", in case unrecoverable failure happen. This means that when there is no point in delayed retries, because we encountered unrecoverable error, then we can move that Message into persistent store. This way we don't lose the information, and when the bug is fixed, we can simply retry that Message to resume the flow from the place it failed.

There are of course more resiliency patterns, that are part of Ecotone, like:

  • Automatic retries to send Messages to Asynchronous Message Channels

  • Reconnection of Message Consumers (Workers) if they lose the connection to the Broker

  • Inbuilt functionalities like Message Outbox, Error Channels with Dead Letter, Deduplication of Messages to avoid double processing,

  • and many many more.

The flow that Ecotone based on the Messages makes the Application possibile to handle failures at the architecture level. By communicating via Messages we are opening for the way, which allows us to self-heal our application without the need for us intervene, and in case of unrecoverable failures to make system robust enough to not lose any information and quickly recover from the point o failure when the bug is fixed.

Business Oriented Architecture

Ecotone shifts the focus from technical details to the actual business processes, using Resilient Messaging as the foundation on which everything else is built. It provides seamless communication using Messages between Applications, Modules or even different Classes.

Together with that we will be using Declarative Configuration with attributes to avoid writing and maintaining configuration files. We will be stating intention of what we want to achieve instead wiring things ourselves, as a result we will regain huge amount of time, which can be invested in more important part of the System. And together with that, we will be able to use higher level Build Blocks like Command, Event Handlers, Aggregates, Sagas which connects to the messaging seamlessly, and helps encapsulate our business logic. So all the above serves as pillars for creating so called Business Oriented Architecture:

  1. Resilient Messaging - At the heart of Ecotone lies a resilient messaging system that enables loose coupling, fault tolerance, and self-healing capabilities.

  2. Declarative Configuration - Introduces declarative programming with Attributes. It simplifies development, reduces boilerplate code, and promotes code readability. It empowers developers to express their intent clearly, resulting in more maintainable and expressive codebases.

  3. Building Blocks - Building blocks like Message Handlers, Aggregates, Sagas, facilitate the implementation of the business logic. By making it possible to bind Building Blocks with Resilient Messaging, Ecotone makes it easy to build and connect even the most complex business workflows.

Materials

Ecotone blog provides articles which describes Ecotone's architecture and related features in more details. Therefore if you want to find out more, follow bellow links:

Links

There maybe situations where multiple Asynchronous Event Handlers will be subscribing to same Event. We can easily imagine that one of them may fail and things like retries become problematic (As they may trigger successful Event Handlers for the second time). That's why Ecotone introduces , which deliver a copy of the Message to each related Event Handler separately. As a result each Asynchronous Event Handler is handling it's own Message in full isolation, and in case of failure only that Handler will be retried.

Having this foundation knowledge and understanding how Ecotone works on the high level, it's good moment to dive into , which will provide hands on experience to deeper understanding.

Click, to find out more...
Click, to find out more...
Click, to find out more...
Click, to find out more...
Message Handling isolation
Click, to find out more...
Click, to find out more...
Click, to find out more...
Click, to find out more...
Tutorial section
Robust and Developer Friendly Architecture in PHP
Practical Domain Driven Design
Reactive and Message Driven Systems in PHP
Enterprise Integration Patterns
Click, to find out more...
Communication between Objects using Messages
Message failed and will be retried with delay
Storing Message for later review, and replaying when bug is fixed
When all thee pillars are solved by Ecotone, what is left to write is Business Oriented Code