Asynchronous Handling
Asynchronous PHP

Running Asynchronously

Ecotone does allow for easy change endpoint to be running synchronously or asynchronously according to current running process.
In order to run Endpoint asynchronously we need to mark it as Asynchronous.
#[Asynchronous("orders")]
#[CommandHandler("order.place", "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}
#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
// do something with $event
}
We need to add endpointId on our endpoint's annotation, in this case in CommandHandler. Asynchronous has channel name defined as orders we need to register such channel. In order to do it, we need to use one of the Modules, that provides pollable channels. At this moment following modules with pollable channels are available:
Currently available Message Channels are integrated with great library enqueue.
Symfony
Laravel
Lite
bin/console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
artisan ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
$consumers = $messagingSystem->list()
After setting up Pollable Channel we can run the endpoint:
Symfony
Laravel
Lite
bin/console ecotone:run orders -vvv
artisan ecotone:run orders -vvv
$messagingSystem->run("orders");

Demo

Read more about asynchronous handling in following blog post.

Running Asychronous Endpoint

Dynamic Configuration

You may set up running configuration for given consumer while running it.
  • handledMessageLimit - Amount of messages to be handled before stopping consumer
  • executionTimeLimit - How long consumer should run before stopping (milliseconds)
  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)
  • stopOnFailure - Stop consumer in case of exception
Symfony
Laravel
Lite
bin/console ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--memoryLimit=512
--stopOnFailure
artisan ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--memoryLimit=512
--stopOnFailure
$messagingSystem->run(
"orders",
ExecutionPollingMetadata::createWithDefault()
->withHandledMessageLimit(5)
->withMemoryLimitInMegabytes(100)
->withExecutionTimeLimitInMilliseconds(1000)
->withStopOnError(true)
);

Static Configuration

Using Service Context configuration for statically configuration.
class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
->setInitialDelayInMilliseconds(100)
->setMemoryLimitInMegaBytes(100)
->setHandledMessageLimit(10)
->setExecutionTimeLimitInMilliseconds(100)
];
}
}
Dynamic configuration overrides static

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Delaying Message

You may delay handling given asynchronous message by adding #[Delayed] attribute.

Static Delay

#[Delayed(50000)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}

Dynamic Delay

$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["deliveryDelay" => 3 * 24 * 60 * 60 * 1000]
);

Priority Message

You may want to priority given handler over another one.

Static Priority

#[Priority(1)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}
#[Priority(5)]
#[Asynchronous("notifications")]
#[EventHandler(endpointId: "welcomeEmail")]
public function sendWelcomeNotificationlWhen(UserWasRegistered $event): void
{
// handle welcome notification
}
In Ecotone all asynchronous handlers are receiving copy of a message. This means that all of them are handled separately in isolation. This allows for safe retry in case of failure.

Dynamic Priority

$commandBus->sendWithRouting(
"askForOrderReview",
"userId",
metadata: ["priority" => 100]
);

Intercepting asynchronous endpoint

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to intercept all polling endpoints you should make use of annotation related point cut on this.

Testing

Running with In Memory implementation

You may register channel to run with In Memory Queue Channel in order to to have run it within single process without external providers. This can be useful in testing scenarios, where we want to test whole flow in simple manner.
class MessagingConfiguration
{
#[ServiceContext]
#[Environment(['test'])]
public function orderChannel()
{
return SimpleMessageChannelBuilder::createQueueChannel("orders");
}
}
And then you can run it as any other asynchronous endpoint.

Dropping messages coming to the channel

You may register channel to be null channel which means it will drop the any message it receive. This can be useful, if we want to turn off specific functionality.
class MessagingConfiguration
{
#[ServiceContext]
#[Environment(['test'])]
public function orderChannel()
{
return SimpleMessageChannelBuilder::createNullableChannel("orders");
}
}

How to run consumer for testing

You may run it by executing CLI command from your test or you may fetch ConfiguredMessagingSystem from your DI Container. If you fetched above, then you can run it like this to limit the consumption process:
$messagingSystem = $this->getService(ConfiguredMessagingSystem::class);
$messagingSystem->run(
"orders",
ExecutionPollingMetadata::createWithDefaults()
->withHandledMessageLimit(1)
->withExecutionTimeLimitInMilliseconds(100)
->withStopOnError(true)
);

Handling Error Messages

Ecotone comes with solution that allow to retry failed messages and push them to error channelfor later investigation.

Error Channel

The error channel is channel defined for handling failed messages. As a default it's turned off. We may set up error channel for specific consumer
- Symfony
- Laravel
- Lite
Setting up Error Channel means that consumer can continue with handling next messages. After sending it to error channel, message is considered as handled.
After setting it up all errors messages will go to this channel so you may listen for it
#[ServiceActivator("errorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// do something with ErrorMessage
}
Service Activator are endpoints like Command Handlers, however they are not exposed using Command/Event/Query Buses. You may use them for internal handling.

Managing full life cycle of Error Message

Ecotone comes by default with full support for managing full life cycle of a error message.
By default message after failing 3 times, will be stored in database for future investigation. From there you may review it, replay or delete it. You need to enable Dbal Module to make it work.
All you need to do to enable after install Dbal Module is to point yourdefaultErrorChannel to errorChannel. Do it for

Custom Configuration

If you want to customize Error Handling, change retry attempts, backoff strategy, initial delay etc, you may configure using ErrorHandlerConfiguration from ServiceContext.
#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
"finalErrorChannel"
);
}
------
#[ServiceActivator("finalErrorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// do something with ErrorMessage
}
If you want to keep the Dbal Dead Letter working your final error channel must be equal to dbal_dead_letter

No retries, store directly in Dead Letter

In case you don't want to perform retries, yet you would like to store it immedietiely in dead letter for later review or manual retry, set your main error channel as dbal_dead_letter.

Discarding Error Messages immediately

If you are not interested in keeping or retrying given error message, because for example you depend on something unreliable that you expect will break a lot, then you can set your error channel or dead letter channel as nullChannel.
You may set up above configurations to work for given consumer instead of setting them globally.

Outbox Pattern

Whenever you use more than one storage during the action, no matter if this is HTTP or during handling message asynchronously, one storage may fail. If that will be the case, your state will be correct partially. You may end up with records in database and no message being published to Message Broker or opposite can happen.
For critical parts of the system we want to have assurance that no message will be lost.
To solve this Ecotone implements Outbox pattern like solution. In order to make sure your messages are stored together with your changes we need to set up Dbal Module for database connection you use for storing data. And then set up dbal asynchronous channel for your handlers.
Dbal Message Channel
#[ServiceContext]
public function enableRabbitMQ()
{
return DbalBackedMessageChannelBuilder::create("async");
}
Asynchronous Event Handler
#[Asynchronous("async")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
// notify about new order
}
After this all your messages will be go through your database as a message channel. All messages will be stored together with your other changes and will be wrapped in same transaction.
In case of need to scale, you may run as many consumers as you want for given dbal channel.

Demo implementation

You may find demo implementation here.
Export as PDF
Copy link
On this page
Running Asynchronously
Demo
Running Asychronous Endpoint
Dynamic Configuration
Static Configuration
Multiple Asynchronous Endpoints
Delaying Message
Static Delay
Dynamic Delay
Priority Message
Static Priority
Dynamic Priority
Intercepting asynchronous endpoint
Testing
Running with In Memory implementation
Dropping messages coming to the channel
How to run consumer for testing
Handling Error Messages
Error Channel
Managing full life cycle of Error Message
Custom Configuration
No retries, store directly in Dead Letter
Discarding Error Messages immediately
Outbox Pattern
Demo implementation