Asynchronous Message Handlers

Running Asynchronously

Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.

In order to run Command Handler asynchronously we need to mark it as Asynchronous.

#[Asynchronous("orders")]
#[CommandHandler("order.place", "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
   // do something with $command
}

The same way we define for Event Handlers:

#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
   // do something with $event
}

We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to our Message Handlers.

Message Channel Name

#[Asynchronous("orders")]

The "orders" string is actually a name of our Message Channel. This way we reference to specific implementation which we would like to use. To provide specific implementation like for example Database Channel, we would use ServiceContext.

final readonly class EcotoneConfiguration
{
    #[ServiceContext]
    public function databaseChannel()
    {
        return DbalBackedMessageChannelBuilder::create('orders');
    }
}

This is basically all we need to configure. Now database channel called orders will be used, whenever we will use Attribute with this name.

There are multiple different implementation which we can use:

Available Asynchronous Message Channels

At this moment following modules with pollable channels are available:

If you're choose Dbal Message Channel Ecotone will use Outbox Pattern to atomically store your changes and published messages.

Currently available Message Channels are integrated via great library enqueue.

bin/console ecotone:list
+--------------------+
| Endpoint Names     |
+--------------------+
| orders             |
+--------------------+

After setting up Pollable Channel we can run the endpoint:

bin/console ecotone:run orders -vvv

Running Asychronous Endpoint (PollingMetadata)

Dynamic Configuration

You may set up running configuration for given consumer while running it.

  • handledMessageLimit - Amount of messages to be handled before stopping consumer

  • executionTimeLimit - How long consumer should run before stopping (milliseconds)

  • finishWhenNoMessages - Consumers will be running as long as there will be messages to consume

  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)

  • stopOnFailure - Stop consumer in case of exception

bin/console ecotone:run orders 
    --handledMessageLimit=5 
    --executionTimeLimit=1000 
    --finishWhenNoMessages
    --memoryLimit=512
    --stopOnFailure

Static Configuration

Using Service Context configuration for statically configuration.

class Configuration
{    
    #[ServiceContext]
    public function configuration() : array
    {
        return [
            PollingMetadata::create("orders")
                 ->setErrorChannelName("errorChannel")
                 ->setInitialDelayInMilliseconds(100)
                 ->setMemoryLimitInMegaBytes(100)
                 ->setHandledMessageLimit(10)
                 ->setExecutionTimeLimitInMilliseconds(100)
                 ->withFinishWhenNoMessages(true)
        ];
    }
}

Dynamic configuration overrides static

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.

#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}

#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Intercepting asynchronous endpoint

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to intercept all polling endpoints you should make use of annotation related point cut on this.

Endpoint Id

Each Asynchronous Message Handler requires us to define "endpointId". It's unique identifier of your Message Handler.

#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed") // Your important endpoint Id
public function when(OrderWasPlaced $event) : void {}

Endpoint Id goes in form of Headers to your Message Queue. After Message is consumed from the Queue, Message will be directed to your Message Handler having given endpoint Id. This decouples the Message completely from the Message Handler Class and Method and Command/Event Class.

EndpointId ensures we can freely refactor our code and it will be backward compatible with Messages in the Queues. This means we can move the method and class to different namespaces, change the Command class names and as long as endpointId is kept the same Message will be delivered correctly.

Last updated