Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Installation
  • Deduplication (Idempotent Consumer)
  • Custom Deduplication
  • Custom Deduplication across Handlers
  • Deduplication clean up
  • Disable Deduplication

Was this helpful?

Export as PDF
  1. Modelling
  2. Recovering, Tracing and Monitoring
  3. Resiliency

Idempotent Consumer (Deduplication)

PreviousDbal Dead LetterNextResilient Sending

Last updated 2 months ago

Was this helpful?

Installation

In order to use Deduplication, install .

Deduplication (Idempotent Consumer)

The role of deduplication is to safely receive same message multiple times, as there is no guarantee from Message Brokers that we will receive the same Message once. In Ecotone all Messages are identifiable and contains of Message Id. Message Id is used for deduplication. If message was already handled, it will be skipped.

Deduplication is enabled by default and works whenever message is consumed in .

Custom Deduplication

You may also define given Message Handler for deduplication. This will use and deduplicated base on your customer header key. This allows in synchronous and asynchronous scenarios.

This is especially useful when, we receive events from external services e.g. payment or notification events which contains of identifier that we may use deduplication on. For example Sendgrid (Email Service) sending us notifications about user interaction, as there is no guarantee that we will receive same webhook once, we may use "eventId", to deduplicate in case.

$this->commandBus->send($command, metadata: ["paymentId" => $paymentId]);
final class PaymentHandler
{
    #[Deduplicated('paymentId')]
    #[CommandHandler(endpointId: "receivePaymentEndpoint")]
    public function receivePayment(ReceivePayment $command): void
    {
        // handle 
    }
}

paymentId becomes our deduplication key. Whenever we will receive now Command with same value under paymentId header, Ecotone will deduplicate that and skip execution of receivePayment method.

We pass endpointId to the Command Handler to indicate that deduplication should happen within Command Handler with this endpoint id. If we would not pass that, then endpointId will be generated and cached automatically. This means deduplication for given Command Handler would be valid as long as we would not clear cache.

Custom Deduplication across Handlers

Deduplication happen across given endpointId.This means that if we would introduce another handler with same deduplication key, it will get it's own deduplication tracking.

final class PaymentHandler
{
    #[Deduplicated('paymentId')]
    #[CommandHandler(endpointId: "receivePaymentChangesEndpoint")]
    public function receivePaymentChanges(ReceivePayment $command): void
    {
        // handle 
    }
}

As deduplication is tracked within given endpoint id, it means we can change the deduplication key safely without being in risk of receiving duplicates. If we would like to start tracking from fresh, it would be enough to change the endpointId.

Deduplication clean up

To remove expired deduplication history which is kept in database table, Ecotone provides an console command:

bin/console ecotone:deduplication:remove-expired-messages
artisan ecotone:deduplication:remove-expired-messages
$messagingSystem->runConsoleCommand("ecotone:deduplication:remove-expired-messages");

This command can be configured to run periodically e.g. using cron jobs.

By default Ecotone removes message id from deduplication storage after 7 days in batches of 1000. It can be customized in case of need:

class DbalConfiguration
{
    #[ServiceContext]
    public function registerTransactions(): DbalConfiguration
    {
        return DbalConfiguration::createWithDefaults()
                // 100000 ms - 100 seconds
                ->withDeduplication(
                    expirationTime: 100000,
                    removalBatchSize: 1000
                );
    }
}

It's important to keep removal batch size at small number. As deleting records may result in database index rebuild which will cause locking. Therefore small batch size will ensure our system can continue, while messages are being deleted in background.

Disable Deduplication

As the deduplication is enabled by default, if you want to disable it then make use of DbalConfiguration.

class DbalConfiguration
{
    #[ServiceContext]
    public function registerTransactions(): DbalConfiguration
    {
        return DbalConfiguration::createWithDefaults()
                ->withDeduplication(false);
    }

}
Ecotone's Dbal Module
asynchronous way
Message Headers