Asynchronous Message Handlers

Running Asynchronously

Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.

In order to run Command Handler asynchronously we need to mark it as Asynchronous.

#[Asynchronous("orders")]
#[CommandHandler(endpointId: "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
   // do something with $command
}

The same way we define for Event Handlers:

#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
   // do something with $event
}

We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to our Message Handlers.

Message Channel

The asynchronous attribute states what Channel reference we want to use:

The "orders" string is the name of our Message Channel. We use this name to reference which implementation we want to use—whether it's an in-memory channel for testing, a database queue, or RabbitMQ. This naming approach keeps our business code clean and independent from infrastructure choices.

To configure a specific implementation like a database channel, we use a ServiceContext class.

That's all the configuration we need! Now whenever we reference "orders" in our handler attributes, Ecotone automatically uses this database channel. Our handlers stay exactly the same whether we're using in-memory channels for testing or database channels for production—the only difference is this single configuration change.

Running Message Consumer

We can first list all of the Message Consumers we have available for running:

Then in order to run our Message Consumer, we will use ecotone:run console command:

Dynamic Configuration

You may set up running configuration for given consumer while running it.

  • handledMessageLimit - Amount of messages to be handled before stopping consumer

  • executionTimeLimit - How long consumer should run before stopping (milliseconds)

  • finishWhenNoMessages - Consumers will be running as long as there will be messages to consume

  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)

  • stopOnFailure - Stop consumer in case of exception

Static Configuration

Using Service Context configuration for statically configuration.

Dynamic configuration overrides static

Available Providers (Types)

There are multiple different implementation which we can use:

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Intercepting asynchronous endpoint

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to intercept all polling endpoints you should make use of annotation related point cut on this.

Endpoint Id

Each Asynchronous Message Handler requires us to define "endpointId". It's unique identifier of your Message Handler.

The Endpoint ID travels with your message as part of the headers to your message channel. Once we consume the message from the Message Channel, Ecotone uses this ID to route it to the correct Message Handler. This completely decouples our messages from specific handler classes and methods—we can refactor, rename, or move our handlers around without breaking message routing.

Last updated

Was this helpful?