Asynchronous Handling

Asynchronous PHP

Running Asynchronously

Ecotone does allow for easy change endpoint to be running synchronously or asynchronously according to current running process.

In order to run Endpoint asynchronously we need to mark it as Asynchronous.

#[Asynchronous("orders")]
#[CommandHandler("order.place", "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
// do something with $command
}
#[Asynchronous("orders")]
#[EventHandler(endpointId: "place_order_endpoint")
public function when(OrderWasPlaced $event) : void
{
// do something with $command
}

We need to add endpointId on our endpoint's annotation, in this case in CommandHandler. Asynchronous has channel name defined as orders we need to register such channel. In order to do it, we need to use one of the Modules, that provides pollable channels. At this moment following modules with pollable channels are available:

Currently available Message Channels are integrated with great library enqueue.

Symfony
Laravel
Lite
Symfony
bin/console ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
Laravel
artisan ecotone:list
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
+--------------------+
Lite
$consumers = $messagingSystem->list()

After setting up Pollable Channel we can run the endpoint:

Symfony
Laravel
Lite
Symfony
bin/console ecotone:run orders -vvv
Laravel
artisan ecotone:run orders -vvv
Lite
$messagingSystem->run("orders");

Running configuration

Dynamic Configuration You may set up running configuration for given consumer while running it.

  • handledMessageLimit - Amount of messages to be handled before stopping consumer

  • executionTimeLimit - How long consumer should run before stopping (milliseconds)

  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)

  • stopOnFailure - Stop consumer in case of exception

Symfony
Laravel
Lite
Symfony
bin/console ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--memoryLimit=512
--stopOnFailure
Laravel
artisan ecotone:run orders
--handledMessageLimit=5
--executionTimeLimit=1000
--memoryLimit=512
--stopOnFailure
Lite
$messagingSystem->run(
"orders",
ExecutionPollingMetadata::createWithDefault()
->withHandledMessageLimit(5)
->withMemoryLimitInMegabytes(100)
->withExecutionTimeLimitInMilliseconds(1000)
->withStopOnError(true)
);

Static Configuration

Using Service Context configuration for statically configuration.

class Configuration
{
#[ServiceContext]
public function configuration() : array
{
return [
PollingMetadata::create("orders")
->setErrorChannelName("errorChannel")
->setInitialDelayInMilliseconds(100)
->setMemoryLimitInMegaBytes(100)
->setHandledMessageLimit(10)
->setExecutionTimeLimitInMilliseconds(100)
];
}
}

Dynamic configuration overrides static

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.

#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}
#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Intercepting asynchronous endpoint

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to intercept all polling endpoints you should make use of annotation related point cut on this.

Running synchronously

You may register channel to be synchronously in order to run whenever the event is published. This can be useful in testing scenarios, where we want to test whole flow in simple manner.

class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return SimpleMessageChannelBuilder::createPublishSubscribeChannel("orders");
}
}

Dropping messages coming to the channel

You may register channel to be null channel which means it will drop the any message it receive. This can be useful in testing scenarios, if we want to turn off specific functionality.

class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return SimpleMessageChannelBuilder::createNullableChannel("orders");
}
}

Handling Error Messages

Ecotone comes with solution that allow to retry failed messages and push them to error channelfor later investigation.

Error Channel

The error channel is channel defined for handling failed messages. As a default it's turned off. We may set up error channel for specific consumer

Setting up Error Channel means that consumer can continue with handling next messages. After sending it to error channel, message is considered as handled.

After setting it up all errors messages will go to this channel so you may listen for it

#[ServiceActivator("errorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// do something with ErrorMessage
}

Service Activator are endpoints like Command Handlers, however they are not exposed using Command/Event/Query Buses. You may use them for internal handling.

Retries

If you want to retry message before it will be finally handled, you may do it setting up ErrorHandlerConfiguration from ServiceContext.

#[ServiceContext]
public function errorConfiguration()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
"finalErrorChannel"
);
}
------
#[ServiceActivator("finalErrorChannel")]
public function handle(ErrorMessage $errorMessage): void
{
// do something with ErrorMessage
}

Storing, Replaying, Deleting Failed Messages

To make use full support for storing, replaying, deleting error messages, check: