Comment on page
Symfony Messenger Transport
Symfony Messenger CQRS DDD
Ecotone comes with Symfony Messenger Transport integration.
We may use Transports as our Message Channels.
This way we can work with familiar environment and reuse already set up
Symfony Serializer
.Suppose we have
messenger_async
transport.framework:
messenger:
transports:
async: 'doctrine://default?queue_name=async'
Then we can register it using Service Context in Ecotone:
final class MessagingConfiguration
{
#[ServiceContext]
public function asyncChannel()
{
return SymfonyMessengerMessageChannelBuilder::create("async");
}
}
#[Asynchronous('async')]
#[CommandHandler(endpointId:"place_order_endpoint")]
public function placeOrder(PlaceOrder $command): void
{
// place the order
}
In order to trigger
Command
Event
or Query
Handler, we will be sending the Message
via given Ecotone's Bus, sending Messages via Symfony Bus will have no effect due to lack of information what kind of message it's.Ecotone provide Command/Event/Query buses, Symfony contains general Message Bus.
It's important to distinguish between Message types as
Queries
are handled synchronously, Commands
are point to point and target single Handler, Events
on other hand are publish-subscribe which means multiple Handlers can subscribe to it. In order to trigger given Bus, inject CommandBus, EventBus or QueryBus (they are automatically available after installing Ecotone) and make use of them to send a Message.
final class OrderController
{
public function __construct(private CommandBus $commandBus) {}
public function placeOrder(Request $request): Response
{
$command = $this->prepareCommand($request);
$this->commandBus->send($command);
return new Response();
}
(...)
Instead of using Messenger
messenger:consume
, we will be running Message Consumer using Ecotone's command:bin/console ecotone:run messenger_async -vvv
Symfony Messenger when Message can't be sent to failed transport will lose the Message.
This is not the case when using Ecotone with Messenger Transport. Ecotone will requeue the message in that case.
When sending command and events via routing, it's possible to use non-class types, however Symfony Messenger Transports require to use class. To solve that Ecotone wraps simple types in a class and unwrap it on deserialization. Thanks to that we can Symfony Transport like any other Ecotone's Message Channel.
- Command Handler with command having
array payload
$this->commandBus->sendWithRouting('order.place', ["products" => [1,2,3]]);
#[Asynchronous('async')]
#[CommandHandler('order.place', 'place_order_endpoint')]
public function placeOrder(array $payload): void
{
// do something with
}
$this->commandBus->sendWithRouting('order.place', metadata: ["aggregate.id" => 123]);
#[Aggregate]
class Order
{
#[Asynchronous('async')]
#[CommandHandler('cancel', 'cancel_order_endpoint')]
public function cancel(): void
{
$this->isCancelled = true;
}
(...)
In case of sending events, we will be using
Event Bus
.
Ecotone deliver copy of the Event
to each of the Event Handlers
, this allows for handling in isolation and safe retries.- Inject Event Bus into your service, it will be available out of the box.
$this->eventBus->publish(new OrderWasPlaced($orderId));
- Subscribe to event
#[Asynchronous('async')]
#[EventHandler('order_was_placed_endpoint')]
public function notifyAbout(OrderWasPlaced $event): void
{
// send notification
}
Last modified 11d ago