Idempotency (Deduplication)

Installation

In order to use Deduplication, install Ecotone's Dbal Module.

Default Idempotent Message Consumer

The role of deduplication is to safely receive same message multiple times, as there is no guarantee from Message Brokers that we will receive the same Message once. In Ecotone all Messages are identifiable and contains of Message Id. Message Id is used for deduplication by default, when Dbal Module is installed. If message was already handled, it will be skipped.

Custom Deduplication

You may also define given Message Handler for deduplication. This will use Message Headers and deduplicated base on your customer header key. This allows in synchronous and asynchronous scenarios.

This is especially useful when, we receive events from external services e.g. payment or notification events which contains of identifier that we may use deduplication on. For example Sendgrid (Email Service) sending us notifications about user interaction, as there is no guarantee that we will receive same webhook once, we may use "eventId", to deduplicate in case.

$this->commandBus->send($command, metadata: ["paymentId" => $paymentId]);
final class PaymentHandler
{
    #[Deduplicated('paymentId')]
    #[CommandHandler(endpointId: "receivePaymentEndpoint")]
    public function receivePayment(ReceivePayment $command): void
    {
        // handle 
    }
}

paymentId becomes our deduplication key. Whenever we will receive now Command with same value under paymentId header, Ecotone will deduplicate that and skip execution of receivePayment method.

We pass endpointId to the Command Handler to indicate that deduplication should happen within Command Handler with this endpoint id. If we would not pass that, then endpointId will be generated and cached automatically. This means deduplication for given Command Handler would be valid as long as we would not clear cache.

Custom Deduplication across Handlers

Deduplication happen across given endpointId.This means that if we would introduce another handler with same deduplication key, it will get it's own deduplication tracking.

final class PaymentHandler
{
    #[Deduplicated('paymentId')]
    #[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
    public function receivePaymentChanges(ReceivePayment $command): void
    {
        // handle 
    }
}

Deduplication with Expression language

We can also dynamically resolve deduplicate value, for this we can use expression language.

final class PaymentHandler
{
    #[Deduplicated(expression: 'payload.paymentId')]
    #[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
    public function receivePaymentChanges(ReceivePayment $command): void
    {
        // handle 
    }
}

We could also access any object from our Dependency Container, in order to calculate mapping:

final class PaymentHandler
{
    #[Deduplicated(expression: 'reference("paymentIdMapper").map(payload.paymentId)')]
    #[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
    public function receivePaymentChanges(ReceivePayment $command): void
    {
        // handle 
    }
}

Deduplication with Command Bus

To reuse same deduplication mechanism across different Message Handlers we may want to decide to use Deduplication on the level of Command Bus. For this, it's enough to extend Command Bus interface with out custom one:

#[Deduplicated(expression: "headers['paymentId']")]
interface PaymentCommandBus extends CommandBus
{
}

Then all Commands sent over this Command Bus will be deduplicated using "paymentId" header.

Command Bus name

By default using same deduplication key between Command Buses, will mean that Message will be discarded. If we want to ensure isolation that each Command Bus is tracking his deduplication separately, we can add tracking name:

#[Deduplicated("paymentId", trackingName: 'payment_tracker']]
interface PaymentCommandBus extends CommandBus
{
}

Deduplication clean up

To remove expired deduplication history which is kept in database table, Ecotone provides an console command:

bin/console ecotone:deduplication:remove-expired-messages

This command can be configured to run periodically e.g. using cron jobs.

By default Ecotone removes message id from deduplication storage after 7 days in batches of 1000. It can be customized in case of need:

class DbalConfiguration
{
    #[ServiceContext]
    public function registerTransactions(): DbalConfiguration
    {
        return DbalConfiguration::createWithDefaults()
                // 100000 ms - 100 seconds
                ->withDeduplication(
                    expirationTime: 100000,
                    removalBatchSize: 1000
                );
    }
}

Disable Deduplication

As the deduplication is enabled by default, if you want to disable it then make use of DbalConfiguration.

class DbalConfiguration
{
    #[ServiceContext]
    public function registerTransactions(): DbalConfiguration
    {
        return DbalConfiguration::createWithDefaults()
                ->withDeduplication(false);
    }

}

Last updated

Was this helpful?