Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Intercepting Gateways
  • Building customized Gateways
  • Pointcut by attributes
  • Asynchronous Gateways

Was this helpful?

Export as PDF
  1. Modelling
  2. Extending Messaging (Middlewares)

Extending Message Buses (Gateways)

PreviousIntercepting Asynchronous EndpointsNextEvent Sourcing

Last updated 7 months ago

Was this helpful?

Ecotone provides ability to extend any Messaging Gateway using Interceptors. We can hook into the flow and add additional behavior.

For better understanding, please read before going through this chapter.

Intercepting Gateways

Suppose we want to add custom logging, whenever any Command is executed. We know that CommandBus is a interface for sending Commands, therefore we need to hook into that Gateway.

class LoggerInterceptor
{
    #[Before(pointcut: CommandBus::class)]
    public function log(object $command, array $metadata) : void
    {
        // log Command message
    }
}

Intercepting Gateways, does not differ from intercepting Message Handlers.

Building customized Gateways

We may also want to have different types of Message Buses for given Message Type. For example we could have EventBus with audit which we would use in specific cases. Therefore we want to keep the original EventBus untouched, as for other scenarios we would simply keep using it.

To do this, we will introduce our new EventBus:

interface AuditableEventBus extends EventBus {}

That's basically enough to register our new interface. This new Gateway will be automatically registered in our DI container, so we will be able to inject it and use.

It's enough to extend given Gateway with custom interface to register new abstraction in Gateway in Dependency Container. In above example AuditableEventBus will be automatically available in Dependency Container to use, as Ecotone will deliver implementation.

#[CommandHandler]
public function placeOrder(PlaceOrder $command, AuditableEventBus $eventBus)
{
    // place order
    
    $eventBus->publish(new OrderWasPlaced());
}

Now as this is separate interface, we can point interceptor specifically on this

class Audit
{
    #[Before(pointcut: AuditableEventBus::class)]
    public function log(object $event, array $metadata) : void
    {
        // store audit
    }
}

Pointcut by attributes

We could of course intercept by attributes, if we would like to make audit functionality reusable

#[Auditable]
interface AuditableEventBus extends EventBus {}

and then we pointcut based on the attribute

class Audit
{
    #[Before(pointcut: Auditable::class)]
    public function log(object $event, array $metadata) : void
    {
        // store audit
    }
}

Asynchronous Gateways

#[Asynchronous("async")]
interface OutboxCommandBus extends CommandBus
{

}

Gateways can also be extended with asynchronous functionality on which you can read more in .

Interceptors section
Asynchronous section