Distributed Bus

Distribution

Ecotone provides support for communication in distributed architecture. The support covers sending command directly to specific service (application) or publishing events, that other services can subscribe too.

Configuration

In order for Ecotone how to route messages you need to register Service Name (Application Name).

Modules Providing Support

Distribution Bus

Distribution Bus is Message Gateway just like CommandBus or EventBus. The bus is responsible for distribution of your command and events.

Depending on your Module configuration you can inject it by class name or specific reference name.

interface DistributedBus
{
// Command distribution

    public function sendCommand(string $destination, string $routingKey, string $command, string $sourceMediaType = MediaType::TEXT_PLAIN, array $metadata = []) : void;

    public function convertAndSendCommand(string $destination, string $routingKey, object|array $command, array $metadata = []) : void;

// Event distribution

    public function publishEvent(string $routingKey, string $event, string $sourceMediaType = MediaType::TEXT_PLAIN, array $metadata = []) : void;

    public function convertAndPublishEvent(string $routingKey, object|array $event, array $metadata) : void;
    
// general message distribution

    public function sendMessage(string $destination, string $targetChannelName, string $payload, string $sourceMediaType = MediaType::TEXT_PLAIN, array $metadata = []): void;    
}

Commands

Sending Distributed Commands

  • destination - is a Service Name of targeted Service

  • routingKey - is a routing key under which CommandHandler is registered on targeted Service

public function changeBillingDetails(DistributedBus $distributedCommandBus)
{
    $distributedCommandBus->sendCommand(
        "billing", // destination
        "billing.changeDetails", // routingKey
        '["personId":"123","billingDetails":"01111"]',
        "application/json"
    );
}

Consuming distributed command

On the consumer side register distributed command handler

#[Distributed]
#[CommandHandler("billing.changeDetails")]
public function changeBillingDetails(ChangeBillingDetails $command) : void
{
    // do something with billing details
}

To expose command handler for service distribution mark it with Distributed attribute.

Run the consumer

Run consumer for your registered distributed consumer. It will be available under your Service Name

List:

bin/console ecotone:list
+--------------------+
| Endpoint Names     |
+--------------------+
| billings           |
+--------------------+

Run it:

bin/console ecotone:run billing -vvv

Events

Publishing Distributed Events

  • routingKey - is a routing key under which event will be published

public function changeBillingDetails(DistributedBus $eventBus)
{
    $eventBus->publishEvent(
        "billing.detailsWereChanged", // routingKey
         '{"personId":123,"billingDetails":0001}',
         "application/json" 
    );
}

Consuming Distributed Events

#[Distributed]
#[EventHandler("billing.detailsWereChanged")]
public function registerTicket(BillingDetailsWereChanged $event) : void
{
    // do something with event
}

To start listening for distributed events under specific key, provide Distributed attribute.

Run the consumer

It's the same consumer, so we run it just like distributed command bus.

Message

You may distribute general message, if you don't want to include given endpoint as Command or Event Handler.

Publishing Distributed Messages

  • destination - is a Service Name of targeted Service

  • targetChannelName - is a target channel on the destination side

public function changeBillingDetails(DistributedBus $distributedCommandBus)
{
    $distributedCommandBus->sendCommand(
        "billing", // destination
        "billing.changeDetails", // targetChannelName
        '["personId":"123","billingDetails":"01111"]',
        "application/json"
    );
}

Consuming Distributed Messages

We register general Message Handler using Service Activator.

#[Distributed]
#[ServiceActivator("billing.changeDetails")]
public function changeBillingDetails(ChangeBillingDetails $command) : void
{
    // do something with billing details
}

Run the consumer

It's the same consumer, so we run it just like distributed command bus.

Last updated