Links

Asynchronous Handling

Asynchronous PHP

Running Asynchronously

Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.
In order to run Endpoint asynchronously we need to mark it as Asynchronous.
#[Asynchronous("orders")]
#[CommandHandler("order.place", "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}
#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
// do something with $event
}
We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to your Message Handler. Asynchronous attribute has channel name orders, we need to register this channel with the implementation of our choice. In order to do it, we need to use one of the Modules, that provides pollable channels. At this moment following modules with pollable channels are available:
If you're choose Dbal Message Channel Ecotone will use Outbox Pattern to atomically store your changes and published messages.
Currently available Message Channels are integrated via great library enqueue.
Symfony
Laravel
Lite
bin/console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
artisan ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
$consumers = $messagingSystem->list()
After setting up Pollable Channel we can run the endpoint:
Symfony
Laravel
Lite
bin/console ecotone:run orders -vvv
artisan ecotone:run orders -vvv
$messagingSystem->run("orders");

Running Asychronous Endpoint (PollingMetadata)

Dynamic Configuration

You may set up running configuration for given consumer while running it.
  • handledMessageLimit - Amount of messages to be handled before stopping consumer
  • executionTimeLimit - How long consumer should run before stopping (milliseconds)
  • finishWhenNoMessages - Consumers will be running as long as there will be messages to consume
  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)
  • stopOnFailure - Stop consumer in case of exception
Symfony
Laravel
Lite
bin/console ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--finishWhenNoMessages
--memoryLimit=512
--stopOnFailure
artisan ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--finishWhenNoMessages
--memoryLimit=512
--stopOnFailure
$messagingSystem->run(
"orders",
ExecutionPollingMetadata::createWithDefault()
->withHandledMessageLimit(5)
->withMemoryLimitInMegabytes(100)
->withExecutionTimeLimitInMilliseconds(1000)
->withFinishWhenNoMessages(true)
->withStopOnError(true)
);

Static Configuration

Using Service Context configuration for statically configuration.
class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
->setInitialDelayInMilliseconds(100)
->setMemoryLimitInMegaBytes(100)
->setHandledMessageLimit(10)
->setExecutionTimeLimitInMilliseconds(100)
->withFinishWhenNoMessages(true)
];
}
}
Dynamic configuration overrides static

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Delaying Message

You may delay handling given asynchronous message by adding #[Delayed] attribute.

Static Delay

#[Delayed(50000)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}

Dynamic Delay

$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["deliveryDelay" => 3 * 24 * 60 * 60 * 1000]
);

Priority Message

You may want to priority given handler over another one.

Static Priority

#[Priority(1)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}
#[Priority(5)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}
In Ecotone all asynchronous handlers are receiving copy of a message. This means that all of them are handled separately in isolation. This allows for safe retry in case of failure.

Dynamic Priority

$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["priority" => 100]
);

Intercepting asynchronous endpoint

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to intercept all polling endpoints you should make use of annotation related point cut on this.

Materials

Demo implementation

You may find demo implementation here.