Links

Resilient Sending

Whenever we use more than one storage during single action, storing to first storage may end up with success, yet the second may not. This can happen when we store data in database and then send Messages to Message Broker. If failure happen it can be that we will send some Message to Broker, yet fail to store related data or vice versa. Ecotone provide you with tools to help solve this problem in order to make sending Messages to Message Broker resilient.

Sending Retries

Whenever sending to Message Broker fails, Ecotone will retry in order to self-heal the application.
By default Ecotone will do 2 reties when sending to Message Channel fails: - First after 10ms - Second after 400ms.
You may configure sending retries per asynchronous channel:
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::create(
RetryTemplateBuilder::exponentialBackoff(initialDelay: 10, multiplier: 2)
->maxRetryAttempts(3)
->build()
);
}

Message Collector

Ecotone by default enables Message Collector. Collector collect messages that are about to be send to asynchronous channels in order to send them just before the transaction is committed. This way it help avoids bellow pitfalls:
Message Collector is enabled by default. It works whenever messages are sent via Command Bus or when message are consumed asynchronously.

Ghost Messages

Let's consider example scenario: During order processing, we publish an OrderWasPlaced event, yet we fail to store Order in the database. This means we've published Message that is based on not existing data, which of course will create inconsistency in our system.
When Message Collector is enabled it provides much higher assurance that Messages will be send to Message Broker only when your flow have been successful.

Eager Consumption

Let's consider example scenario: During order processing, we may publish an OrderWasPlaced event, yet it when we publish it right away, this Message could be consumed and handled before Order is actually committed to the database. In such situations consumer will fail due to lack of data or may produce incorrect results.
Due to Message Collector we gracefully reduce chance of this happening.

Failure on Sending next Message

In general sending Messages to external broker is composed of three stages:
  • Serialize Message Payload
  • Map and prepare Message Headers
  • Send Message to external Broker
In most of the frameworks those three steps are done together, which may create an issue. Let's consider example scenario: We send multiple Messages, the first one may with success and the second fail on serialization. Due to that transaction will be rolled back, yet we already produced the first Message, which becomes an Ghost Message. To avoid that Ecotone perform first two actions first, then collect all Messages and as a final step iterate over collected Messages and sent them. This way Ecotone ensures that all Messages must have valid serialization before we actually try to send any of them.

Disable Message Collector:

As Collector keeps the Messages in memory till the moment they are sent, in case of sending a lot of messages you may consider turning off Message Collector, to avoid memory consumption. This way Messages will be sent instantly to your Message Broker.
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withCollector(false);
}

Error Channel

After exhausting limit of retries in order to send the Message to the Broker, we know that we won't be able to do this. In this scenario exception will be rethrown and transaction will be rolled back. Yet imagine customer making huge Order in our E-Commerce shop, in that case rollback may be the last thing that we want to do. To avoid failing and being able to recover consistency Ecotone provides Error Channel for Messages that failed during sending. This way we can add custom handling for those situations or store the Message in Dbal Dead Letter.
#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}

Custom handling

#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("failure_channel")
}
---
#[ServiceActivator('failure_channel')]
public function doSomething(ErrorMessage $errorMessage): void
{
// Handle failure message on your own terms :)
}

Dbal Dead Letter

#[ServiceContext]
public function asyncChannelConfiguration()
{
return GlobalPollableChannelConfiguration::createWithDefaults()
->withErrorChannel("dbal_dead_letter")
}
If you will push Error Messages to Dbal Dead Letter, then they will be stored in your database for later review. You may then delete or replay them after fixing the problem. This way we ensure consistency even if unrecoverable failure happened our system continues to have self-healed.

Customized configuration per Message Consumer

If you need customization per Message Consumer you may do it using PollableChannelConfiguration by providing Message Consumer name:
#[ServiceContext]
public function asyncChannelConfiguration()
{
return PollableChannelConfiguration::createWithDefaults('notifications')
->withCollector(false)
->withErrorChannel("dbal_dead_letter")
}

Outbox Pattern

To ensure high level degree of consistency, we may decide to store messages along side with data changes. This way we work only with single data storage, avoiding completely problem with persisting Message in two sources at once. To make it happen Ecotone implements so called Outbox pattern.
For critical parts of the systems we may decide to commit Messages to the same database as data changes using Outbox pattern.

Installation

In order to use Outbox pattern we need to set up Dbal Module.

Dbal Message Channel

By sending asynchronous messages via database, we are storing them together with data changes. This thanks to default transactions for Command Handlers, commits them together.
#[ServiceContext]
public function databaseChannel()
{
return DbalBackedMessageChannelBuilder::create("async");
}

Asynchronous Event Handler

#[Asynchronous("async")]
#[EventHandler(endpointId:"notifyAboutNeworder")]
public function notifyAboutNewOrder(OrderWasPlaced $event) : void
{
// notify about new order
}
After this all your messages will be go through your database as a message channel.

Setup Outbox where it's needed

With Ecotone's Outbox pattern we set up given Channel to run via Database. This means that we can target specific channels, that are crucial to run under outbox pattern. In other cases where data consistency is not so important to us, we may actually use Message Broker Channels directly and skip the Outbox. As an example, registering payments and payouts may an crucial action in our system, so we use it with Outbox pattern. However sending an "Welcome" notification may be just fine to run directly with Message Broker.

Scaling the solution

One of the challenges of implementing Outbox pattern is way to scale it. When we start consume a lot of messages, we may need to run more consumers in order to handle the load.

Publishing deduplication

In case of Ecotone, you may safely scale your Messages Consumers that are consuming from your Dbal Message Channel. Each message will be reserved for the time of being published, thanks to that no duplicates will be sent when we scale.

Handling via different Message Broker

However we may actually want to avoid scaling our Dbal based Message Consumers to avoid increasing the load on the database. For this situation Ecotone allows to make use so called Combined Message Channels. In that case we would run Database Channel only for the outbox and for actual Message Handler execution a different one. This is powerful concept, as we may safely produce messages with outbox and yet be able to handle and scale via RabbitMQ SQS Redis etc.
#[Asynchronous(["database_channel", "rabbit_channel"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
}
  • database_channel is Dbal Message Channel
  • rabbit_channel is our RabbitMQ Message Channel
Then we run one or few Message Consumers for outbox and we scale Message Consumers for rabbit.

Combined Message Channels with reference

If we want more convient way as we would like to apply combined message channels on multiple Message Handlers, we may create an reference.
#[ServiceContext]
public function combinedMessageChannel(): CombinedMessageChannel
{
return CombinedMessageChannel::create(
'outbox_sqs', //Reference name
['database_channel', 'amazon_sqs_channel'], // list of combined message channels
);
}
And then we use reference for our Message Handlers.
#[Asynchronous(["outbox_sqs"])]
#[EventHandler(endpointId: 'orderWasPlaced')]
public function handle(OrderWasPlaced $event): void
{
/** Do something */
}