Links

Saga Introduction

Process Manager Saga PHP
Read Aggregate Introduction sections first to get wider context before diving into Sagas.

Handling Sagas

Saga is responsible for coordination of long running processes. It can store information about what happened and make a decision what to do next in result.
#[Saga]
class OrderFulfillment
{
#[Identifier]
private string $orderId;
private bool $isFinished;
private function __construct(string $orderId)
{
$this->orderId = $orderId;
}
// Static Factory Method. Create new instance of Saga.
#[EventHandler]
public static function start(OrderWasPlacedEvent $event) : self
{
return new self($event->getOrderId());
}
// Action method in existing Saga instance
#[EventHandler]
public function whenPaymentWasDone(PaymentWasFinishedEvent $event, CommandBus $commandBus) : self
{
if ($this->isFinished) {
return;
}
$this->isFinished = true;
$commandBus->send(new ShipOrderCommand($this->orderId));
}
}
Aggregate - Saga can stated-stored Aggregate or Event Sourced Aggregate EventHandler - We mark method to be called, when specific event happens.
  • start - is factory methodand should construct new instance OrderFulfillment.Depending on need you may construct differently as Event Sourced Aggregate.
  • paymentWasDone - Is called when PaymentWasFinishedEvent event is published. We have injected CommandBus into the method in order to finish process by sending ShipOrderCommand. We could also publish event instead.

Storing Saga's State

Saga just as Aggregates are stored using Repository implementation, you may store Saga as event stream or store whole current state.

Targeting Identifier from Event/Command

As Saga is identified by identifier just like an Aggregate, subscribed events need to be correlated with specific instance of Saga. When we do have event like PaymentWasFinishedEvent we need to tell Ecotone which instance of OrderFulfillment it should be retrieve from Repository to call method on.
This is done automatically, when property name in Event is the same as property marked as #[Identifier] in aggregate.
class PaymentWasFinishedEvent
{
private string $orderId;
}
If the property name is different we need to give Ecotone a hint, how to correlate identifiers.
class SomeEvent
{
#[TargetIdentifier("orderId")]
private string $purchaseId;
}

Targeting Identifier from Metadata

When there is no property to correlate inside Command or Event, we can make use of Before or Presend Interceptors to enrich event's metadata with required identifier. When we've the identifier inside Metadata then we can use identifierMetadataMapping. Suppose the orderId identifier is available in metadata under key orderNumber, then we can tell Message Handler to use this mapping:
#[EventHandler(identifierMetadataMapping: ["orderId" => "orderNumber"])]
public function failPayment(PaymentWasFailedEvent $event, CommandBus $commandBus) : self
{
// do something with $event
}

Unordered Events

In the starting example we have assumed, that the first event we will receive is OrderWasPlacedEvent which will create new Saga instance, and the second will call action on the existing Saga (PaymentWasFinishedEvent). It it's always risky to make such assumptions as if we receive those events out of order, our Saga may break. However in case of Ecotone we can handle this scenario is to expect them to come in different order and handle it gracefully.
#[Saga]
class OrderFulfillment
{
#[Identifier]
private string $orderId;
private bool $isFinished;
private function __construct(string $orderId)
{
$this->orderId = $orderId;
}
#[EventHandler]
public static function startByPlacedOrder(OrderWasPlacedEvent $event) : self
{
return new self($event->getOrderId());
}
// Make factory method for PaymentWasFinish
#[EventHandler]
public static function startByPaymentFinished(PaymentWasFinished $event) : self
{
return new self($event->getOrderId());
}
// Make action method for OrderWasPlaced
#[EventHandler]
public function whenOrderWasPlaced(OrderWasPlacedEvent $event, CommandBus $commandBus) : self
{
if ($this->isFinished) {
return;
}
$this->isFinished = true;
$commandBus->send(new ShipOrderCommand($this->orderId));
}
#[EventHandler]
public function whenPaymentWasDone(PaymentWasFinishedEvent $event, CommandBus $commandBus) : self
{
if ($this->isFinished) {
return;
}
$this->isFinished = true;
$commandBus->send(new ShipOrderCommand($this->orderId));
}
}
#[EventHandler("whenOrderWasPlaced")]
public static function startByPlacedOrder(OrderWasPlacedEvent $event) : self
#[EventHandler("whenOrderWasPlaced")]
public function whenOrderWasPlaced(OrderWasPlacedEvent $event, CommandBus $commandBus) : self
If you look closely you will see factory method startByPlacedOrder and action method whenOrderWasPlaced are handling the same event. If that's the case, Ecotone will verify, before calling factory method if the Saga exists and if so, will reroute the event to the action method. This solution will prevent us from depending on the order of events, without introducing routing functionality into our business code.

Ignoring Events

There may be situations, when we will want to handle events, only if Saga already started. Suppose we want to send promotion code to the customer, if he received great customer badge first, otherwise we want to skip.
#[Saga]
class OrderFulfillment
{
#[Identifier]
private string $customerId;
private function __construct(string $customerId)
{
$this->customerId = $customerId;
}
#[EventHandler]
public static function start(ReceivedGreatCustomerBadge $event) : void
{
return new self($event->getCustomerId());
}
#[EventHandler(dropMessageOnNotFound: true)]
public function whenNewOrderWasPlaced(OrderWasPlaced $event, CommandBus $commandBus) : void
{
$commandBus->send(new PromotionCode($this->customerId));
}
}
We filter the Event by adding dropMessageOnNotFound.
EventHandler(dropMessageOnNotFound=true)
If this saga instance will be not found, then this event will be dropped and our whenNewOrderWasPlaced method will not be called.
Options we used in here, can also be applied to Command Handlers

Handling Commands

More often than not you will find the definition of Saga subscribing to Events and sending Commands. In case of Ecotone this is not required, you may actually send Command to Saga in case of need.
This is especially useful when we want to trigger given process manually. Suppose that Administrator has option to provide Promotion Code to the Customer in case we've not delivered his good on time. Yet this promotion code will only be given, when Customer verified his Email first.
#[Saga]
class PromotionCodeSaga
{
#[Identifier]
private string $customerId;
private bool $isFinished;
private function __construct(string $customerId)
{
$this->customerId = $customerId;
$this->isFinished = false;
}
#[CommandHandler]
public static function start(AddPromotionCode $command) : void
{
return new self($command->getCustomerId());
}
#[EventHandler(dropMessageOnNotFound: true)]
public function whenNewOrderWasPlaced(
EmailWasVerified $event,
CommandBus $commandBus
) : void
{
if ($this->isFinished) {
return;
}
$this->isFinished = true;
$commandBus->send(new SendPromotionCode($this->customerId));
}
}
In this situation using Commands to start the Saga directly simplifies the flow. We then subscribe to EmailWasVerified to send the Promotion code when this happens.

Time based Actions

There may be situations when we will want to do time based actions. Taking as an example previous scenario of PromotionCodeSaga. We may actually want to give opportunity to receive the promotion code for 24 hours, if Customer have not verified his email during that time, then discard it.
To make it happen we will use Asynchronous Processing, which allows for delaying of Messages.
#[Saga]
class PromotionCodeSaga
{
// We've added possibility to record events for this Saga.
use WithEvents;
#[Identifier]
private string $customerId;
private bool $isFinished;
private function __construct(string $customerId)
{
$this->customerId = $customerId;
$this->isFinished = false;
$this->recordThat(new PromotionCodeWasStarted($this->customerId));
}
#[CommandHandler]
public static function start(AddPromotionCode $command) : void
{
return new self($command->getCustomerId());
}
// Time in ms, this method will execute 24 hours after promotion have started
#[Delayed(1000 * 60 * 60 * 24)]
#[Asynchronous("async_channel")]
#[EventHandler(endpointId: "promotion_code_saga.timeout")]
public function timeout(PromotionCodeWasStarted $event): void
{
$this->isFinished = true;
}
#[EventHandler(dropMessageOnNotFound: true)]
public function whenNewOrderWasPlaced(
EmailWasVerified $event,
CommandBus $commandBus
) : void
{
if ($this->isFinished) {
return;
}
$this->isFinished = true;
$commandBus->send(new SendPromotionCode($this->customerId));
}
}
Saga just like Aggregate can record new Events in case of need. We are publishing here new event, indicating that Saga was started and within same Saga we subscribe to it with delay.
Subscribing to creation events with delay, makes the complex flow effortless. We were not in need to introduce any new concept in order to delay the Message, we used features of underlying Messaging Architecture. We may use Delays inside the Aggregates too and even for Command in case of need.