Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Saga - Stateful Workflow
  • Subscribing to Events and taking action
  • Publishing Events from Saga
  • Ignoring Events
  • Fetching State from Saga
  • Unordered Events
  • Targeting Identifier from Event/Command
  • Using Correlation Id as Saga's Identifier

Was this helpful?

Export as PDF
  1. Modelling
  2. Business Workflows

Stateful Workflows - Saga

Saga / Process Manager - Stateful Workflow PHP

PreviousThe Basics - Stateless WorkflowsNextHandling Failures

Last updated 1 year ago

Was this helpful?

Not all Workflows need to maintain state. In case of step by step Workflows we mostly can handle them using stateless input / output functionality. However there may a cases, where we would like to maintain the state and the reasoning behind that may come from:

  • Our Workflow branches into multiple separate flows, which need to be combined back to make the decision

  • Our Workflow involves manual steps, like approving / declining before Workflow can resume or make the decision

  • Our Workflow is long running process, which can take hours or days and we would like to have high visibility of the current state

For above cases when Stateful Workflow is needed, we can make use of Sagas.

It's good to read and section first, to get wider context about Ecotone's Messaging, before diving into Sagas.

Saga - Stateful Workflow

Saga is a Stateful Workflow, as it's running on the same basics as Stateless Workflows using Input and Output Channels under the hood. The difference is that it does persist the state, therefore the decisions it makes can be based on previous executions.

We define Sagas using Saga Attribute and by defining Identifier.

#[Saga]
final class OrderProcess
{
    private function __construct(
        // Saga has to have Identifier, so we can fetch it from the Storage
        #[Identifier] private string $orderId,
        private OrderProcessStatus   $orderStatus,
    ) {}

    // This our factory method, which provides new Saga instance when Order Was Placed
    #[EventHandler]
    public static function startWhen(OrderWasPlaced $event): self
    {
        return new self($event->orderId, OrderProcessStatus::PLACED);
    }
}

Sagas are initialized using Factory Methods (Methods marked as static). We can initialize Saga by subscribing to an Event, or by trigger an Command Action.

Aggregates and Sagas provides the same functionality, both can subscribe to Events and receive Commands, and both are persisted using Repositories. Therefore whatever we use one or another, is about making the code more explicit. As Sagas are more meant for business processes, and Aggregate for protecting business invariants.

Subscribing to Events and taking action

We could have Ship Order Command Handler, which we would like to trigger from Saga:

class Shipment
{
    #[CommandHandler("shipOrder")]
    public function prepareForShipment(ShipOrder $event): void
    {
        // ship the Order
    }
}

Saga can subscribe to Event and take action on that to send this Command:

#[Saga]
final class OrderProcess
{   
    (...)

    // Message will be delivered to Message Channel "shipOrder"
    #[EventHandler(outputChannelName: "shipOrder")]
    public function prepareForShipment(PaymentWasSuccessful $event): ShipOrder
    {
        $this->orderStatus = OrderProcessStatus::READY_TO_BE_SHIPPED;
        
        return new ShipOrder($this->orderId);
    }

We could also take action by sending an Message using Command Bus.

#[EventHandler(outputChannelName: "shipOrder")]
public function prepareForShipment(PaymentWasSuccessful $event, CommandBus $commandBus): void
{
    $this->orderStatus = OrderProcessStatus::READY_TO_BE_SHIPPED;
    
    $commandBus->send(new ShipOrder($this->orderId));
}

The difference between using Output Channel and Command Bus is in matter of time execution. - When we use Command Bus, Message will be send before Saga state will be persisted. - When we will use Output Channel, Saga state will first persisted and then Message will be send.

Publishing Events from Saga

We can publish Events from Saga, to trigger next actions. This can be useful for example, if we would like to trigger time based actions after X amount after Saga was started. For example we may want to close Order, if it was not paid after 24 hours.

#[Saga]
final class OrderProcess
{
    use WithEvents;

    private function __construct(
        #[Identifier] private string $orderId,
        private OrderProcessStatus   $orderStatus,
        private bool $isPaid,
    ) {
        $this->recordThat(new OrderProcessWasStarted($this->orderId));
    }

    #[Delayed(new TimeSpan(days: 1))]
    #[Asynchronous('async')]
    #[EventHandler]
    public function startWhen(OrderProcessWasStarted $event): void
    {
        if (!$this-isPaid()) {
            $this->orderStatus = OrderProcessStatus::CANCELLED;
        }
    }
    
    (...)
}

Ignoring Events

There may be situations, when we will want to handle events only if Saga already started. Suppose we want to send promotion code to the customer, if he received great customer badge first, otherwise we want to skip.

#[Saga] 
class OrderFulfillment
{
    #[Identifier] 
    private string $customerId;

    private function __construct(string $customerId)
    {
        $this->customerId = $customerId;
    }

    #[EventHandler] 
    public static function start(ReceivedGreatCustomerBadge $event) : void
    {
        return new self($event->getCustomerId());
    }

    #[EventHandler(dropMessageOnNotFound: true)]
    public function whenNewOrderWasPlaced(OrderWasPlaced $event, CommandBus $commandBus) : void 
    {
        $commandBus->send(new PromotionCode($this->customerId));
    }
}

We filter the Event by adding dropMessageOnNotFound.

EventHandler(dropMessageOnNotFound=true)

If this saga instance will be not found, then this event will be dropped and our whenNewOrderWasPlaced method will not be called.

This option can also be used together with Command Handlers.

Fetching State from Saga

We may want to fetch the State from Saga. This may be useful for example to provide to the Customer current state at which we are at:

#[Saga]
final class OrderProcess
{   
    (...)

    #[QueryHandler("orderProcess.getStatus")]
    public function getStatus(): OrderProcessStatus
    {
        return $this->orderStatus;
    }

Then we can fetch the state using Query Bus

public function OrderController
{
    public function __construct(private QueryBus $queryBus) {}
    
    public function getStatus(Request $request): Response
    {
        $orderId = $request->get('orderId');
        
        return new Response([
            'status' => $this->queryBus->sendWithRouting(
                'orderProcess.getStatus',
                metadata: [
                    // aggregate.id is special metadata, that can be used for both Sagas and Aggregates
                    'aggregate.id' => $orderId
                ]
            )
        ]);
    }
}

Unordered Events

We may actually be unsure about ordering of some Events. It may happen that same Event may to come us at different stages of the Saga. So it will either initialize the Saga or call the Action on Saga. To solve this we Ecotone allows for Method Redirection based on Saga's existance:

#[Saga] 
class OrderFulfillment
{
    private function __construct(#[Identifier] private string $orderId)
    {
    }

    #[EventHandler]
    public static function startByPlacedOrder(OrderWasPlacedEvent $event) : self
    {
        // initialize saga
    }

    #[EventHandler]
    public function whenOrderWasPlaced(OrderWasPlacedEvent $event) : void
    {
        // handle action
    }
    
    (...)
}

In the above example, we used the same Event for Factory and Actions Method. Ecotone if given Saga does not exists yet, will call factory method, otherwise Action.

# Factory Method will be called if Saga does not exists
#[EventHandler("whenOrderWasPlaced")]
public static function startByPlacedOrder(OrderWasPlacedEvent $event) : self

# Action method will be called if Saga already exists
#[EventHandler("whenOrderWasPlaced")]
public function whenOrderWasPlaced(OrderWasPlacedEvent $event, CommandBus $commandBus) : self

This solution will prevent us from depending on the order of events, without introducing any if statements or routing functionality in our business code.

Targeting Identifier from Event/Command

Using Correlation Id as Saga's Identifier

We may want to use Message Correlation Id as Saga's Identifier. This allows us to always map given Saga if Message contains of given Correlation Id.

To store Saga we will be using . We can provide custom implementation or use inbuilt repositories like Doctrine ORM, Eloquent Model or Ecotone's Document Store.

In above example we change Saga's state and then return an Message to be delivered to the Output Channel just like we did for .

Whenever Event or Command comes in to Saga, we need to correlate it with given Saga's instance. For this we can leverage Ecotone's support for . This will give us ability to map Saga using different possibilites.

As Ecotone between Messages, this creates good solution for Workflows that branches and join together at later stage.

Aggregate Introduction
Stateless Workflows
Repositories
Stateless Workflows
Identifier Mapping
automatically propagate Correlation Id