Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Message Consumption from Multiple Channels
  • Distribute based on Context
  • Distribute with Default Channel
  • Running Dynamic Channels and sub-channels
  • Using Internal Channels
  • Throttling strategy
  • Using custom Strategies
  • Using custom Receiving Strategy
  • Using custom Sending Strategy

Was this helpful?

Export as PDF
  1. Modelling
  2. Asynchronous Handling and Scheduling

Dynamic Message Channels

This chapter provides more details about advanced Message Channel functionalities using Dynamic Message Channels. Dynamic Channels can be used to:

  • Simplify deployment strategy

  • Optimize system resources

  • Adjust message consumption or sending process, which is especially useful in SaaS and Multi-Tenant Environments

Dynamic Message Channels are available as part of Ecotone Enterprise.

Message Consumption from Multiple Channels

The default strategy is to have single Message Consumer (Worker process) per Message Channel (Queue). When the volume of Messages is low however, some Consumers may actually be continuously in idle state. In order to not be wasteful about system resources we may want then to join the consumption, so single Message Consumer will poll from multiple Message Channels.

Suppose we do have Message Channels:

final readonly class EcotoneConfiguration
{
    #[ServiceContext]
    public function asyncChannels()
    {
        return [
            DbalBackedMessageChannelBuilder::create('orders'),
            DbalBackedMessageChannelBuilder::create('notifications'),
        ];
    }
}

Dynamic Message Channels can combine multiple channels, so we can treat them as a one.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders_and_notifications', // channel name to consume
            ['orders', 'notifications']
        ),
    ];
}

After that we can consume using "orders_and_notifications" name. We then can run the endpoint:

bin/console ecotone:run orders_and_notifications -vvv
artisan ecotone:run orders_and_notifications -vvv
$messagingSystem->run("orders_and_notifications");

We can combine as many channels as we want under single Dynamic Channel.

Distribute based on Context

There may be situations when we would like to introduce Message Channel per Client. This if often a case in Multi-Tenant environments when premium Customer does get higher consumption rates. In Ecotone we can keep our code agnostic of Multiple Channels, and yet provide this ability to end users in a simple way. For this we will be using Header Based Strategy.

Taking as an example Order Process:

#[Asynchronous("orders")]
#[CommandHandler]
public function placeOrder(PlaceOrderCommand $command) : void
{
   // do something with $command
}

This code is fully agnostic to the details of Multi-Tenant environment. It does use Message Channel "orders" to process the Command. We can however make the "orders" an Dynamic Channel, which will actually distribute to multiple Channels. To do this we will introduce distribution based on the Metadata from the Command.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        // normal Message Channels
        DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
        DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
    
        // our Dynamic Channel used in Command Handler
        DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
            thisMessageChannelName: 'orders',
            headerName: 'tenant',
            headerMapping: [
                'tenant_a' => 'tenant_a_channel',
                'tenant_b' => 'tenant_b_channel',
            ]
        ),
    ];
}

Now whenever this Command is sent with tenant metadata, Ecotone will decide to which Message Channel it should proxy the Message.

$this->commandBus->send(
   new PlaceOrderCommand(),
   metadata: [
      'tenant' => 'tenant_a'
   ]
);

Above will work exactly the same for Events.

Then we would simply run those as separate Message Consumption processes (Workers):

# Symfony
bin/console ecotone:run tenant_a_channel -vvv
bin/console ecotone:run tenant_b_channel -vvv

# Laravel
artisan ecotone:run tenant_a_channel -vvv
artisan ecotone:run tenant_b_channel -vvv

# Ecotone Lite
$messagingSystem->run("tenant_a_channel");
$messagingSystem->run("tenant_b_channel");

Distribute with Default Channel

We may want to introduce separate Message Channels for premium Tenants and just have a shared Message Channel for the rest. For this we would use default channel:

#[ServiceContext]
public function dynamicChannel()
{
    return [
        // normal Message Channels
        DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
        DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
        DbalBackedMessageChannelBuilder::create('shared_channel'),
    
        DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
            'orders',
            'tenant',
            [
                'tenant_a' => 'tenant_a_channel',
                'tenant_b' => 'tenant_b_channel',
            ],
            'shared_channel' // the default channel, when above mapping does not apply
        ),
    ];
}

The default channel will be used, when no mapping will be found:

$this->commandBus->send(
   new PlaceOrderCommand(),
   metadata: [
      'tenant' => 'tenant_c' //no mapping for this tenant, go to default channel
   ]
);

Running Dynamic Channels and sub-channels

Running Dynamic Channels does not differ from normal channels.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DbalBackedMessageChannelBuilder::create('tenant_a_orders'),
        DbalBackedMessageChannelBuilder::create('tenant_b_orders'),
        DbalBackedMessageChannelBuilder::create('tenant_c_orders'),
    
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a_orders', 'tenant_b_orders', 'tenant_c_orders']
        ),
    ];
}

If we will run "orders", which is Dynamic Channel combined of three other Channels, Ecotone will run Message Consumption process which will use round-robin strategy to consume from each of them:

bin/console ecotone:run orders -vvv
artisan ecotone:run shared_consumer -vvv
$messagingSystem->run("shared_consumer");

Typically we would also run consumption process for this specific channels, which require extra processing power. This Message Consumer will focus only on Messages within that Channel.

bin/console ecotone:run tenant_a_orders -vvv
artisan ecotone:run tenant_abc -vvv
$messagingSystem->run("tenant_abc");

Using Internal Channels

By default whatever Message Channels we will define, we will be able to start Message Consumer for it. However if given set of Channels is only meant to be used under Dynamic Channel, we can make it explicit and avoid allowing them to be run separately.

To do so we use Internal Channels which will be available only for the Dynamic Channel visibility.

#[ServiceContext]
public function dynamicChannel()
{
    return [    
            DynamicMessageChannelBuilder::createRoundRobin(
                'orders_and_notifications', // channel name to consume
                ['orders', 'notifications']
            ),
            ->withInternalChannels([
                DbalBackedMessageChannelBuilder::create('orders'),
                DbalBackedMessageChannelBuilder::create('notifications'),
            ]),
    ];
}

Internal Channels are only visible for the Dynamic Channel, therefore they can't be used for Asynchronous Message Handlers. What should be used for Async Handlers is the name of Dynamic Message Channel.

Throttling strategy

Let's take as an example of Multi-Tenant environment where each of our Clients has set limit of 5 orders to be processed within 24 hours. This limit is known to the Client and he may buy extra processing unit to increase his daily capacity.

Often used solution to skip processing is to reschedule Messages with a delay and check after some time if Client's message can now be processed. This solution however will waste resources, as we consume Messages that are not meant to be handled. Therefore Ecotone provides alternative, which skips the consumption completely, so we can avoid wasting resources on polling or rescheduling Messages, as we simply don't consume them at all.

To skip the consumption we will use Skipping Strategy. We will start by defining Message Channel per Client, so we can skip consumption from given Channel when Client have reached the limit.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DbalBackedMessageChannelBuilder::create('client_a'),
        DbalBackedMessageChannelBuilder::create('client_b'),
    
        DynamicMessageChannelBuilder::createThrottlingStrategy(
            thisMessageChannelName: 'orders',
            requestChannelName: 'decide_for_client',
            channelNames: [
                'client_a',
                'client_b',
            ],
        ),
    ];
}
#[InternalHandler('decide_for_client')]
public function decide(
    string $channelNameToCheck, //this will be called in round robin with client_a / client_b
): bool
{
    // by returning true we start the consumption process, by returning false we skip
    return $this->checkIfReachedConsumptionLimit($channelNameToCheck);
}

This function will run in round-robin manner for each defined Message Channel (client_a, client-b).

By using Throttling Strategy we can easily rate limit our Clients. This can work dynamically, if Customer will buy credit credits, we can start returning true from decisioning method, which will kick-off the consumption. This means that we create real-time experience for Customers.

Using custom Strategies

In some scenarios we may actually want to take full control over sending and receiving. In this situations we can make use of custom Strategies that completely replaces the inbuilt ones. This way we can tell which Message Channel we would like to send Message too, and from which Channel we would like to receive Message from.

Using custom Receiving Strategy

To roll out custom receiving strategy we will use "withCustomReceivingStrategy":

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a', 'tenant_b', 'tenant_c']
        )
            // we change round robin receving strategy to our own customized one
            ->withCustomReceivingStrategy('decide_on_consumption'),
    ];
}
final class Decider
{
     /**
     * @return string channel name to consume from
     */
    #[InternalHandler('decide_on_consumption')]
    public function toReceive(): string
    {
        // this should return specific message channel from which we will consume
    }
}

If we want to stop Consumption completely we can return "nullChannel" string. This will skip consuming given Channel. This may be useful in order to turn of given Message Consumer at run time.

Using custom Sending Strategy

To roll out custom receiving strategy we will use "withCustomReceivingStrategy":

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a', 'tenant_b', 'tenant_c']
        )
            // we change round robin sending strategy to our own customized one
            ->withCustomSendingStrategy('decide_on_send'),
    ];
}
final class Decider
{
     /**
     * @return string channel name to send too
     */
    #[InternalHandler('decide_on_send')]
    public function toSend(Message $message): string
    {
        // this should return specific message channel to which we send
    }
}

If we would like to discard given Message, we can return "nullChannel" string.

PreviousSchedulingNextDistributed Bus and Microservices

Last updated 1 month ago

Was this helpful?

To prepare an Message Consumer that will be able to consume from those two Channels in manner (meaning each consumption is called after another one) we can set up Dynamic Message Channel.

When shared_consumer and tenant_abc will read from same Message Channel at the same time, it will work as . Therefore each will get his own unique Message.

The "decide_for_client" is the name of our that will do the decision.

To set up our Custom Strategy, we will use .

To set up our Custom Strategy, we will use .

Round Robin
Competitive Consumer pattern
Internal Message Handler
Internal Handler
Internal Handler