Symfony Messenger Transport

Symfony Messenger CQRS DDD

Symfony Messenger Transport Integration

Ecotone comes with Symfony Messenger Transport integration. We may use Transports as our Message Channels. This way we can work with familiar environment and reuse already set up Symfony Serializer.

Asynchronous Message Handler

Suppose we have messenger_async transport.

framework:
  messenger:
    transports:
      async: 'doctrine://default?queue_name=async'

Then we can register it using Service Context in Ecotone:

final class MessagingConfiguration
{
    #[ServiceContext]
    public function asyncChannel()
    {
        return SymfonyMessengerMessageChannelBuilder::create("async");
    }
}

After that we can start using it as any other asynchronous channel.

#[Asynchronous('async')]
#[CommandHandler(endpointId:"place_order_endpoint")]
public function placeOrder(PlaceOrder $command): void
{
    // place the order
}

Trigger Command/Event/Query via Ecotone Bus

In order to trigger Command Event or Query Handler, we will be sending the Message via given Ecotone's Bus, sending Messages via Symfony Bus will have no effect due to lack of information what kind of message it's.

Ecotone provide Command/Event/Query buses, Symfony contains general Message Bus. It's important to distinguish between Message types as Queries are handled synchronously, Commands are point to point and target single Handler, Events on other hand are publish-subscribe which means multiple Handlers can subscribe to it.

In order to trigger given Bus, inject CommandBus, EventBus or QueryBus (they are automatically available after installing Ecotone) and make use of them to send a Message.

final class OrderController
{
    public function __construct(private CommandBus $commandBus) {}

    public function placeOrder(Request $request): Response
    {
        $command = $this->prepareCommand($request);

        $this->commandBus->send($command);

        return new Response();
    }
    
    (...)

Running Message Consumer (Worker)

Instead of using Messenger messenger:consume, we will be running Message Consumer using Ecotone's command:

bin/console ecotone:run messenger_async -vvv

Symfony Messenger when Message can't be sent to failed transport will lose the Message. This is not the case when using Ecotone with Messenger Transport. Ecotone will requeue the message in that case.

Sending messages via routing

When sending command and events via routing, it's possible to use non-class types, however Symfony Messenger Transports require to use class. To solve that Ecotone wraps simple types in a class and unwrap it on deserialization. Thanks to that we can Symfony Transport like any other Ecotone's Message Channel.

  • Command Handler with command having array payload

$this->commandBus->sendWithRouting('order.place', ["products" => [1,2,3]]);
#[Asynchronous('async')]
#[CommandHandler('order.place', 'place_order_endpoint')]
public function placeOrder(array $payload): void
{
    // do something with 
}
  • Command Handler inside Aggregate with command having no payload at all

$this->commandBus->sendWithRouting('order.place', metadata: ["aggregate.id" => 123]);
#[Aggregate]
class Order
{
    #[Asynchronous('async')]
    #[CommandHandler('cancel', 'cancel_order_endpoint')]
    public function cancel(): void
    {
        $this->isCancelled = true;
    }
(...)    

Asynchronous Event Handlers

In case of sending events, we will be using Event Bus. Ecotone deliver copy of the Event to each of the Event Handlers, this allows for handling in isolation and safe retries.

  • Inject Event Bus into your service, it will be available out of the box.

$this->eventBus->publish(new OrderWasPlaced($orderId));
  • Subscribe to event

#[Asynchronous('async')]
#[EventHandler('order_was_placed_endpoint')]
public function notifyAbout(OrderWasPlaced $event): void
{
    // send notification
}

Last updated