Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Outbox Distributed Bus
  • Multiple Service Maps
  • Hiding consumption on the sending side
  • Separate Event and Command and other customizations
  • Passing Metadata
  • Intercepting Sending Messages

Was this helpful?

Export as PDF
  1. Modelling
  2. Distributed Bus and Microservices
  3. Distributed Bus
  4. Distributed Bus with Service Map

Custom Features

PreviousConfigurationNextNon-Ecotone Application integration

Last updated 3 months ago

Was this helpful?

Outbox Distributed Bus

We may send Message via Distributed Bus together with other actions, most likely database changing ones. For those scenarios, we may instruct Distributed Bus to first persist Message in given Database Message Channel and then distribute it to external Service.

#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
    return DistributedServiceMap::initialize()
              ->withServiceMapping(serviceName: "ticketService", channelName: "distributed_ticket_service")
              ->withAsynchronousChannel('database_channel')
}

After that Asynchronous Channel will be the first one to which Message will be send, therefore enabling ability to commit all changes within same transaction.

Multiple Service Maps

In case we would like to explicitly separate Service Maps for specific integrations, for example having Service Map for internal Services (which given Team owns), and separate Service Map for cross Team integrations, then we can register more than one Distributed Bus:

#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
    return [
        DistributedServiceMap::initialize(referenceName: 'internalDistributedBus')
           ->withServiceMapping(serviceName: "ticketService", channelName: "distributed_ticket_service"),
        DistributedServiceMap::initialize(referenceName: 'externalDistributedBus')
            ->withServiceMapping(serviceName: "orderService", channelName: "distributed_order_service")
    ];
}

referenceName will be the name at which DistributedBus for given Service Map will be registered in Dependency Container. Using that name we will be able to inject that into our Application level classes.

The default name under which Distributed Bus is registered is the class name of Distributed Bus. Therefore it works out of the box with auto-wire.

Hiding consumption on the sending side

When we register Message Channel it becomes available for consumption in given Service. This means that we may register Message Channel, which we actually do not own.

bin/console ecotone:list
+-----------------------------+
| Endpoint Names              |
+-----------------------------+
| distributed_ticket_service  |
+-----------------------------+
artisan ecotone:list
+----------------------------+
| Endpoint Names             |
+----------------------------+
| distributed_ticket_service |
+----------------------------+
$consumers = $messagingSystem->list();

// distributed_ticket_service

So from perspective of the Service which just publish to this Message Channel, it does not own and should not consume from it. To ensure no consumption happen, and that this Message Consumer will not be available under ecotone:list command, we can use of Dynamic Message Channel capability.

#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
    return DynamicMessageChannelBuilder::createWithSendOnlyStrategy(
       DbalBackedMessageChannelBuilder::createChannel('distributed_ticket_service')
    );
}

Now this Message Channel will be able to be used only sending, and won't be even visible for list of Message Consumers to run:

bin/console ecotone:list
+-----------------------------+
| Endpoint Names              |
+-----------------------------+
artisan ecotone:list
+----------------------------+
| Endpoint Names             |
+----------------------------+
$consumers = $messagingSystem->list();

// ---

Separate Event and Command and other customizations

Distributed Bus using Map allows for full customization of how we want to distribute Messages. This way we can for example decide, we would like to distribute Events and Commands separately, or send given Event to some custom Message Channel separately from the rest.

#[ServiceContext]
public function serviceMap(): DistributedServiceMap
{
    return DynamicMessageChannelBuilder::createNoStrategy('distributed_ticket_channel')
            ->withHeaderSendingStrategy(
                headerName: 'ecotone.distributed.payloadType',
                headerMapping: [
                    'command' => 'distributed_ticket_command',
                    'event' => 'distributed_ticket_event'
                ]
            )
            ->withInternalChannels([
                SqsBackedMessageChannelBuilder::create("distributed_ticket_command"),
                SqsBackedMessageChannelBuilder::create("distributed_ticket_event")
            ]);
}

The above configuration use special header which Ecotone adds, when sends Message via Distributed Bus: ecotone.distributed.payloadType. This header hold the type of used Message, whatever it's command or event. Therefore we can make use of it, to route the Message to different Channels. We use Internal Channels here, as those will be only visible for this Dynamic Message Channel.

Passing Metadata

Whatever we send Command or Event we may pass alongside with it Metadata. Metadata can be any additional information, that brings context yet is not part of the Command or Event itself (e.g. Executor Id, Occurred at time).

$distributedBus->convertAndSendCommand(
    targetServiceName: "ticketService",
    routingKey: "ticketService.createTicket",
    command: new CreateTicket($personId, "Call Customer to collect more details"),
    metadata: [
        "executorId" => $executorId
    ]
);

then we access metadata on the Distributed Handler

#[Distributed]
#[CommandHandler("ticketService.createTicket")]
public function changeBillingDetails(
        CreateTicket $command, 
        #[Header("executorId")] string $executorId
): void
{
    // create new Ticket
}

Intercepting Sending Messages

#[Before(pointcut: DistributedBus::class, changeHeaders: true)]
public function addHeaders(#[Reference] $authenticationService): array
{
    return [
        "executorId" => $authenticationService->getCurrentUser(),
    ];
}

Intercepting Distributed Buses can be useful for example for adding extra metadata or ensuring given set of headers are filtered out.

Take under consideration that Presend interceptors do not work with Distributed Bus, however Before Interceptor works exactly the same in this context, and can be used instead.

To configure this up, we need to have our registered and then we can instruct Distributed Bus to make use of it:

When we register our Message Channel we will wrap it with with send only strategy:

Whatever context of our Applications needs, we can customize using Distributed Map with .

As we have created DynamicMessageChannel with createNoStrategy, and provided sending strategy in form of withHeaderSendingStrategy. This means that no receiving strategy was given, therefore this Message Consumer will be hidden from the Message Consumer list (works like ).

To intercept Distributed Bus before Message is send we can use .

Database Message Channel
DynamicMessageChannel
Dynamic Message Channels
Interceptors
"Hiding consumption on the sending side"
Send first through database channel and then forward to distributed one