Dynamic Message Channels

This chapter provides more details about advanced Message Channel functionalities using Dynamic Message Channels. Dynamic Channels can be used to:

  • Simplify deployment strategy

  • Optimize system resources

  • Adjust message consumption or sending process, which is especially useful in SaaS and Multi-Tenant Environments

Message Consumption from Multiple Channels

The default strategy is to have single Message Consumer (Worker process) per Message Channel (Queue). When the volume of Messages is low however, some Consumers may actually be continuously in idle state. In order to not be wasteful about system resources we may want then to join the consumption, so single Message Consumer will poll from multiple Message Channels.

Suppose we do have Message Channels:

final readonly class EcotoneConfiguration
{
    #[ServiceContext]
    public function asyncChannels()
    {
        return [
            DbalBackedMessageChannelBuilder::create('orders'),
            DbalBackedMessageChannelBuilder::create('notifications'),
        ];
    }
}

To prepare an Message Consumer that will be able to consume from those two Channels in Round Robin manner (meaning each consumption is called after another one) we can set up Dynamic Message Channel.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders_and_notifications', // channel name to consume
            ['orders', 'notifications']
        ),
    ];
}

After that we can consume using "orders_and_notifications" name. We then can run the endpoint:

bin/console ecotone:run orders_and_notifications -vvv

Distribute based on Context

There may be situations when we would like to introduce Message Channel per Client. This if often a case in Multi-Tenant environments when premium Customer does get higher consumption rates. In Ecotone we can keep our code agnostic of Multiple Channels, and yet provide this ability to end users in a simple way. For this we will be using Header Based Strategy.

Taking as an example Order Process:

#[Asynchronous("orders")]
#[CommandHandler]
public function placeOrder(PlaceOrderCommand $command) : void
{
   // do something with $command
}

This code is fully agnostic to the details of Multi-Tenant environment. It does use Message Channel "orders" to process the Command. We can however make the "orders" an Dynamic Channel, which will actually distribute to multiple Channels. To do this we will introduce distribution based on the Metadata from the Command.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        // normal Message Channels
        DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
        DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
    
        // our Dynamic Channel used in Command Handler
        DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
            thisMessageChannelName: 'orders',
            headerName: 'tenant',
            headerMapping: [
                'tenant_a' => 'tenant_a_channel',
                'tenant_b' => 'tenant_b_channel',
            ]
        ),
    ];
}

Now whenever this Command is sent with tenant metadata, Ecotone will decide to which Message Channel it should proxy the Message.

$this->commandBus->send(
   new PlaceOrderCommand(),
   metadata: [
      'tenant' => 'tenant_a'
   ]
);

Then we would simply run those as separate Message Consumption processes (Workers):

# Symfony
bin/console ecotone:run tenant_a_channel -vvv
bin/console ecotone:run tenant_b_channel -vvv

# Laravel
artisan ecotone:run tenant_a_channel -vvv
artisan ecotone:run tenant_b_channel -vvv

# Ecotone Lite
$messagingSystem->run("tenant_a_channel");
$messagingSystem->run("tenant_b_channel");

Distribute with Default Channel

We may want to introduce separate Message Channels for premium Tenants and just have a shared Message Channel for the rest. For this we would use default channel:

#[ServiceContext]
public function dynamicChannel()
{
    return [
        // normal Message Channels
        DbalBackedMessageChannelBuilder::create('tenant_a_channel'),
        DbalBackedMessageChannelBuilder::create('tenant_b_channel'),
        DbalBackedMessageChannelBuilder::create('shared_channel'),
    
        DynamicMessageChannelBuilder::createWithHeaderBasedStrategy(
            'orders',
            'tenant',
            [
                'tenant_a' => 'tenant_a_channel',
                'tenant_b' => 'tenant_b_channel',
            ],
            'shared_channel' // the default channel, when above mapping does not apply
        ),
    ];
}

The default channel will be used, when no mapping will be found:

$this->commandBus->send(
   new PlaceOrderCommand(),
   metadata: [
      'tenant' => 'tenant_c' //no mapping for this tenant, go to default channel
   ]
);

Running Dynamic Channels and sub-channels

Running Dynamic Channels does not differ from normal channels.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DbalBackedMessageChannelBuilder::create('tenant_a_orders'),
        DbalBackedMessageChannelBuilder::create('tenant_b_orders'),
        DbalBackedMessageChannelBuilder::create('tenant_c_orders'),
    
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a_orders', 'tenant_b_orders', 'tenant_c_orders']
        ),
    ];
}

If we will run "orders", which is Dynamic Channel combined of three other Channels, Ecotone will run Message Consumption process which will use round-robin strategy to consume from each of them:

bin/console ecotone:run orders -vvv

Typically we would also run consumption process for this specific channels, which require extra processing power. This Message Consumer will focus only on Messages within that Channel.

bin/console ecotone:run tenant_a_orders -vvv

When shared_consumer and tenant_abc will read from same Message Channel at the same time, it will work as Competitive Consumer pattern. Therefore each will get his own unique Message.

Using Internal Channels

By default whatever Message Channels we will define, we will be able to start Message Consumer for it. However if given set of Channels is only meant to be used under Dynamic Channel, we can make it explicit and avoid allowing them to be run separately.

To do so we use Internal Channels which will be available only for the Dynamic Channel visibility.

#[ServiceContext]
public function dynamicChannel()
{
    return [    
            DynamicMessageChannelBuilder::createRoundRobin(
                'orders_and_notifications', // channel name to consume
                ['orders', 'notifications']
            ),
            ->withInternalChannels([
                DbalBackedMessageChannelBuilder::create('orders'),
                DbalBackedMessageChannelBuilder::create('notifications'),
            ]),
    ];
}

Throttling strategy

Let's take as an example of Multi-Tenant environment where each of our Clients has set limit of 5 orders to be processed within 24 hours. This limit is known to the Client and he may buy extra processing unit to increase his daily capacity.

To skip the consumption we will use Skipping Strategy. We will start by defining Message Channel per Client, so we can skip consumption from given Channel when Client have reached the limit.

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DbalBackedMessageChannelBuilder::create('client_a'),
        DbalBackedMessageChannelBuilder::create('client_b'),
    
        DynamicMessageChannelBuilder::createThrottlingStrategy(
            thisMessageChannelName: 'orders',
            requestChannelName: 'decide_for_client',
            channelNames: [
                'client_a',
                'client_b',
            ],
        ),
    ];
}

The "decide_for_client" is the name of our Internal Message Handler that will do the decision.

#[InternalHandler('decide_for_client')]
public function decide(
    string $channelNameToCheck, //this will be called in round robin with client_a / client_b
): bool
{
    // by returning true we start the consumption process, by returning false we skip
    return $this->checkIfReachedConsumptionLimit($channelNameToCheck);
}

This function will run in round-robin manner for each defined Message Channel (client_a, client-b).

Using custom Strategies

In some scenarios we may actually want to take full control over sending and receiving. In this situations we can make use of custom Strategies that completely replaces the inbuilt ones. This way we can tell which Message Channel we would like to send Message too, and from which Channel we would like to receive Message from.

Using custom Receiving Strategy

To roll out custom receiving strategy we will use "withCustomReceivingStrategy":

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a', 'tenant_b', 'tenant_c']
        )
            // we change round robin receving strategy to our own customized one
            ->withCustomReceivingStrategy('decide_on_consumption'),
    ];
}

To set up our Custom Strategy, we will use Internal Handler.

final class Decider
{
     /**
     * @return string channel name to consume from
     */
    #[InternalHandler('decide_on_consumption')]
    public function toReceive(): string
    {
        // this should return specific message channel from which we will consume
    }
}

Using custom Sending Strategy

To roll out custom receiving strategy we will use "withCustomReceivingStrategy":

#[ServiceContext]
public function dynamicChannel()
{
    return [
        DynamicMessageChannelBuilder::createRoundRobin(
            'orders',
            ['tenant_a', 'tenant_b', 'tenant_c']
        )
            // we change round robin sending strategy to our own customized one
            ->withCustomSendingStrategy('decide_on_send'),
    ];
}

To set up our Custom Strategy, we will use Internal Handler.

final class Decider
{
     /**
     * @return string channel name to send too
     */
    #[InternalHandler('decide_on_send')]
    public function toSend(Message $message): string
    {
        // this should return specific message channel to which we send
    }
}

Last updated

Was this helpful?