Orchestrators: Declarative Workflow Automation
Learn how to build predefined and dynamic workflows using Orchestrator
While connecting handlers with channels works great for linear workflows, and Sagas excel at stateful processes, Orchestrator is perfect when you need predefined workflows where the workflow definition is separate from the individual steps.
When Do You Need Orchestrator?
Use Orchestrator when you want to:
🎯 Separate workflow from steps: Define the flow independently of step implementation 🔄 Reuse steps: Use the same steps in different workflows ⚡ Build dynamic workflows: Construct workflows programmatically based on business rules 🧪 Easy testing: Test workflows and steps independently 📋 Predefined processes: Execute well-defined business processes consistently
Examples:
Image processing pipeline (resize → watermark → optimize → upload)
Document approval workflow (validate → review → approve → notify)
Order fulfillment process (verify → payment → shipping → tracking)
Customer onboarding (registration → verification → welcome → setup)
Think of Orchestrator as: A conductor that knows the entire symphony (workflow) and tells each musician (step) when to play, while the musicians focus only on their part.
Enterprise Feature: Orchestrator is part of Ecotone's Enterprise features.
Creating Your First Orchestrator
An Orchestrator defines a workflow as a sequence of steps (channel names) and implements those steps as internal handlers.
Step 1: Define the Workflow
class ImageProcessingOrchestrator
{
#[Orchestrator(inputChannelName: "process.image")]
public function processImage(): array
{
return [
"resize.image",
"add.watermark",
"optimize.image",
"upload.image"
];
}
}
Key parts:
#[Orchestrator]
- Tells Ecotone this method defines a workflowinputChannelName
- Channel that triggers this workflowReturn array - List of steps (channel names) to execute in order
Step 2: Implement the Steps
class ImageProcessingOrchestrator
{
// ... workflow definition ...
#[InternalHandler(inputChannelName: "resize.image")]
public function resizeImage(ImageData $image): ImageData
{
// Resize logic here
return $image->resize(800, 600);
}
#[InternalHandler(inputChannelName: "add.watermark")]
public function addWatermark(ImageData $image): ImageData
{
// Watermark logic here
return $image->addWatermark('© Company');
}
#[InternalHandler(inputChannelName: "optimize.image")]
public function optimizeImage(ImageData $image): ImageData
{
// Optimization logic here
return $image->optimize();
}
#[InternalHandler(inputChannelName: "upload.image")]
public function uploadImage(ImageData $image): string
{
// Upload logic here
$url = $this->storageService->upload($image);
return $url;
}
}
What happens when you trigger the workflow:
Message sent to
process.image
channelOrchestrator returns
["resize.image", "add.watermark", "optimize.image", "upload.image"]
Each step executes in sequence, passing data to the next step
Final result is returned
Data Enrichment with Headers
Sometimes you need to add metadata or context without changing the main payload. Use changingHeaders: true
for this:
Enriching with Additional Data
class OrderProcessingOrchestrator
{
#[Orchestrator(inputChannelName: "process.order")]
public function processOrder(): array
{
return [
"validate.order",
"enrich.customer.data",
"calculate.pricing",
"finalize.order"
];
}
#[InternalHandler(inputChannelName: "validate.order")]
public function validateOrder(Order $order): Order
{
// Validation logic
if (!$order->isValid()) {
throw new InvalidOrderException();
}
return $order;
}
#[InternalHandler(
inputChannelName: "enrich.customer.data",
changingHeaders: true
)]
public function enrichCustomerData(Order $order): array
{
// Fetch additional customer data
$customer = $this->customerService->getCustomer($order->customerId);
$loyaltyLevel = $this->loyaltyService->getLevel($order->customerId);
// Return data that becomes message headers
return [
'customerType' => $customer->type,
'loyaltyLevel' => $loyaltyLevel,
'creditScore' => $customer->creditScore
];
}
#[InternalHandler(inputChannelName: "calculate.pricing")]
public function calculatePricing(
Order $order,
#[Header('customerType')] string $customerType,
#[Header('loyaltyLevel')] string $loyaltyLevel
): Order {
// Use enriched data for pricing
$discount = $this->getDiscount($customerType, $loyaltyLevel);
return $order->applyDiscount($discount);
}
#[InternalHandler(inputChannelName: "finalize.order")]
public function finalizeOrder(Order $order): OrderConfirmation
{
// Final processing
$this->orderService->finalize($order);
// We don't really need to return anything, we could make the method void
return new OrderConfirmation($order->id);
}
}
Benefits of header enrichment:
Keep original payload unchanged
Add context data for downstream steps
Maintain clean separation of concerns
Executing Orchestrators
There are several ways to trigger orchestrator workflows:
Method 1: Command Handler with Output Channel
class ImageController
{
#[CommandHandler('image.upload', outputChannelName: 'process.image')]
public function uploadImage(UploadImageCommand $command): ImageData
{
// Handle the upload and prepare data for processing
$imageData = $this->imageService->upload($command->file);
// Return data that will be sent to the orchestrator
return $imageData;
}
}
Flow:
UploadImageCommand
sent to command handlerHandler processes upload and returns
ImageData
Result automatically sent to
process.image
channelOrchestrator workflow begins
Method 2: From Event Handlers (Business Workflows)
class OrderEventHandler
{
#[EventHandler(outputChannelName: "process.order")]
public function whenOrderPlaced(OrderPlaced $event): Order
{
// Convert event to order data for processing
return Order::fromEvent($event);
}
}
Flow:
OrderPlaced
event occursEvent handler processes it and sends result to
process.order
Orchestrator workflow begins automatically
Method 3: Business Interface Triggering Business Workflow
Business Interface is simple interface where Ecotone delivers implementation. This way we can easily create and entrypoint with interface that is part of our application level code and execute the workflow:
interface OrderProcessingService
{
#[BusinessMethod(outputChannelName: 'process.order')]
public function processOrder(Order $order): OrderConfirmation;
}
Usage in your application:
class OrderController
{
public function __construct(private OrderProcessingService $orderService) {}
public function processOrder(Request $request): JsonResponse
{
$order = Order::fromRequest($request);
// This will trigger the orchestrator workflow
$confirmation = $this->orderService->processOrder($order);
return new JsonResponse($confirmation);
}
}
Method 4: Custom Orchestrator Gateway
For dynamic workflows where you want to pass the steps programmatically:
interface OrderProcessingGateway
{
#[OrchestratorGateway]
public function processWithSteps(array $steps, Order $order, array $metadata): OrderConfirmation;
}
class OrderController
{
public function __construct(private OrderProcessingGateway $gateway) {}
public function processOrder(Request $request): Response
{
$order = Order::fromRequest($request);
// Dynamically determine steps based on order type
$steps = $this->determineStepsForOrder($order);
$result = $this->gateway->processWithSteps($steps, $order, []);
return new JsonResponse($result);
}
private function determineStepsForOrder(Order $order): array
{
$steps = ["validate.order"];
if ($order->requiresApproval()) {
$steps[] = "manual.approval";
}
$steps[] = "process.payment";
if ($order->isInternational()) {
$steps[] = "customs.declaration";
}
$steps[] = "ship.order";
return $steps;
}
}
Gateway benefits:
Dynamic workflow construction
Runtime step determination
Easy integration with web controllers
Flexible business rule application
Asynchronous Orchestration
Make your workflows asynchronous for better performance and scalability:
Asynchronous Orchestrator
class AsyncImageProcessingOrchestrator
{
#[Asynchronous('async')]
#[Orchestrator(inputChannelName: "async.process.image")]
public function processImageAsync(): array
{
return [
"resize.image",
"add.watermark",
"optimize.image",
"upload.image"
];
}
// Steps can also be asynchronous individually
#[Asynchronous('async')]
#[InternalHandler(inputChannelName: "resize.image")]
public function resizeImage(ImageData $image): ImageData
{
// Heavy processing that benefits from async execution
return $this->heavyResizeOperation($image);
}
#[InternalHandler(inputChannelName: "add.watermark")]
public function addWatermark(ImageData $image): ImageData
{
// This step runs synchronously
return $image->addWatermark('© Company');
}
}
Mixed Synchronous/Asynchronous Steps
class MixedProcessingOrchestrator
{
#[Orchestrator(inputChannelName: "mixed.workflow")]
public function mixedWorkflow(): array
{
return [
"quick.validation", // Sync - needs immediate feedback
"heavy.processing", // Async - can take time
"send.notification" // Async - fire and forget
];
}
#[InternalHandler(inputChannelName: "quick.validation")]
public function quickValidation(Data $data): Data
{
// Fast validation that should block if it fails
if (!$data->isValid()) {
throw new ValidationException();
}
return $data;
}
#[Asynchronous('async')]
#[InternalHandler(inputChannelName: "heavy.processing")]
public function heavyProcessing(Data $data): Data
{
// CPU-intensive work that can be done in background
return $this->performComplexCalculations($data);
}
#[Asynchronous('notifications')]
#[InternalHandler(inputChannelName: "send.notification")]
public function sendNotification(Data $data): void
{
// Fire-and-forget notification
$this->notificationService->send($data);
}
}
Advanced Features
Dynamic Workflow Building
The power of Orchestrator shines when you build workflows dynamically based on business rules:
class DynamicCustomerOnboardingOrchestrator
{
public function __construct(
private CustomerService $customerService,
private ComplianceService $complianceService
) {}
#[Orchestrator(inputChannelName: "onboard.customer")]
public function onboardCustomer(Customer $customer): array
{
$steps = ["validate.customer"];
// Add steps based on customer type
if ($customer->isEnterprise()) {
$steps[] = "enterprise.verification";
$steps[] = "compliance.check";
}
// Add steps based on location
if ($customer->isInternational()) {
$steps[] = "international.verification";
}
// Add steps based on business rules
if ($this->complianceService->requiresKYC($customer)) {
$steps[] = "kyc.verification";
}
// Common final steps
$steps[] = "create.account";
$steps[] = "send.welcome.email";
return $steps;
}
}
Conditional Step Execution
Steps can return null
to end the workflow early:
class ConditionalProcessingOrchestrator
{
#[Orchestrator(inputChannelName: "conditional.process")]
public function conditionalProcess(): array
{
return [
"check.eligibility",
"process.if.eligible",
"finalize.process"
];
}
therefore void return type and workflow is finalized here.
#[InternalHandler(inputChannelName: "check.eligibility")]
public function checkEligibility(Application $application): ?Application
{
if (!$application->isEligible()) {
// Returning null stops the workflow
return null;
}
return $application;
}
#[InternalHandler(inputChannelName: "process.if.eligible")]
public function processIfEligible(Application $application): Application
{
// This only runs if previous step didn't return null
return $this->processApplication($application);
}
#[InternalHandler(inputChannelName: "finalize.process")]
public function finalizeProcess(Application $application): ApplicationResult
{
return new ApplicationResult($application);
}
}
Nested Orchestrators
Orchestrators can call other orchestrators as steps:
class MasterOrchestrator
{
#[Orchestrator(inputChannelName: "master.workflow")]
public function masterWorkflow(): array
{
return [
"prepare.data",
"sub.workflow.a", // This calls another orchestrator
"sub.workflow.b", // This calls another orchestrator
"combine.results"
];
}
#[InternalHandler(inputChannelName: "prepare.data")]
public function prepareData(RawData $data): ProcessedData
{
return new ProcessedData($data);
}
#[Orchestrator(inputChannelName: "sub.workflow.a")]
public function subWorkflowA(): array
{
return ["step.a1", "step.a2"];
}
#[Orchestrator(inputChannelName: "sub.workflow.b")]
public function subWorkflowB(): array
{
return ["step.b1", "step.b2"];
}
#[InternalHandler(inputChannelName: "combine.results")]
public function combineResults(ProcessedData $data): FinalResult
{
return new FinalResult($data);
}
}
Testing Orchestrators
Testing orchestrators is straightforward with Ecotone Lite. You can test the entire workflow, individual steps, or specific scenarios.
use Ecotone\Lite\EcotoneLite;
use PHPUnit\Framework\TestCase;
class ImageProcessingOrchestratorTest extends TestCase
{
private EcotoneLite $ecotoneLite;
protected function setUp(): void
{
$this->ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ImageProcessingOrchestrator::class],
[
'storageService' => new InMemoryStorageService(),
'imageProcessor' => new TestImageProcessor()
],
ServiceConfiguration::createWithDefaults()
->withLicenceKey(VALID_LICENCE)
);
}
public function test_complete_image_processing_workflow(): void
{
// Arrange
$imageData = new ImageData('test-image.jpg', 1920, 1080);
// Act
$result = $this->ecotoneLite->sendDirectToChannel('process.image', $imageData);
// Assert
$this->assertInstanceOf(ImageData::class, $result);
$this->assertEquals(800, $result->width);
$this->assertEquals(600, $result->height);
$this->assertTrue($result->hasWatermark());
$this->assertTrue($result->isOptimized());
}
}
Testing Individual Steps
public function test_individual_steps(): void
{
$imageData = new ImageData('test.jpg', 1920, 1080);
// Test resize step
$resized = $this->ecotoneLite->sendDirectToChannel('resize.image', $imageData);
$this->assertEquals(800, $resized->width);
$this->assertEquals(600, $resized->height);
// Test watermark step
$watermarked = $this->ecotoneLite->sendDirectToChannel('add.watermark', $resized);
$this->assertTrue($watermarked->hasWatermark());
// Test optimization step
$optimized = $this->ecotoneLite->sendDirectToChannel('optimize.image', $watermarked);
$this->assertTrue($optimized->isOptimized());
}
Testing Data Enrichment
public function test_order_processing_with_header_enrichment(): void
{
$order = new Order('123', 'customer-456', [new Item('product', 100)]);
$result = $this->ecotoneLite->sendDirectToChannel('process.order', $order);
$this->assertInstanceOf(OrderConfirmation::class, $result);
$this->assertTrue($result->hasDiscount()); // Discount applied based on enriched data
}
public function test_customer_data_enrichment_step(): void
{
$order = new Order('123', 'premium-customer', []);
// Test the enrichment step directly
$enrichedHeaders = $this->ecotoneLite->sendDirectToChannel('enrich.customer.data', $order);
$this->assertEquals('premium', $enrichedHeaders['customerType']);
$this->assertEquals('gold', $enrichedHeaders['loyaltyLevel']);
$this->assertGreaterThan(700, $enrichedHeaders['creditScore']);
}
Testing Asynchronous Orchestrators
public function test_asynchronous_orchestrator(): void
{
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[AsyncImageProcessingOrchestrator::class],
['imageProcessor' => new TestImageProcessor()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::CORE_PACKAGE, ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withLicenceKey(VALID_LICENCE),
enableAsynchronousProcessing: [
SimpleMessageChannelBuilder::createQueueChannel('async'),
]
);
$imageData = new ImageData('test.jpg', 1920, 1080);
// Send to async workflow
$ecotoneLite->sendDirectToChannel('async.process.image', $imageData);
// Run async processing
$ecotoneLite->run('async');
// Verify processing completed
$this->assertTrue(true); // Add specific assertions based on your implementation
}
Testing Orchestrator Gateways
public function test_orchestrator_gateway_with_dynamic_steps(): void
{
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[OrderProcessingOrchestrator::class, OrderProcessingGateway::class],
[new OrderProcessingOrchestrator()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::CORE_PACKAGE]))
->withLicenceKey(VALID_LICENCE)
);
/** @var OrderProcessingGateway $gateway */
$gateway = $ecotoneLite->getGateway(OrderProcessingGateway::class);
$order = new Order('123', 'customer-456', []);
$steps = ['validate.order', 'process.payment', 'ship.order'];
$result = $gateway->processWithSteps($steps, $order, []);
$this->assertInstanceOf(OrderConfirmation::class, $result);
}
Key Benefits of Orchestrator
🎯 Separation of Concerns
Workflow definition is separate from step implementation
Easy to understand the entire process at a glance
Steps can be reused across different workflows
🔄 Reusability
Same steps can be used in multiple workflows
Build libraries of reusable business operations
Mix and match steps for different scenarios
⚡ Dynamic Workflows
Build workflows programmatically based on business rules
Adapt to different customer types, regions, or conditions
Runtime workflow construction
🧪 Testability
Test entire workflows end-to-end
Test individual steps in isolation
Easy mocking and stubbing of dependencies
📈 Scalability
Asynchronous execution support
Individual steps can be scaled independently
Easy to add new steps without changing existing code
🔍 Observability
Clear workflow execution path
Easy to monitor and debug
Step-by-step execution tracking
Summary
Orchestrator is perfect for building predefined workflows where you want to:
🎯 Separate workflow definition from step implementation
🔄 Reuse steps across different workflows
⚡ Build dynamic workflows based on business rules
🧪 Test workflows and steps independently
📋 Execute consistent, repeatable business processes
Key insight: Orchestrator shines when you know the types of workflows you need but want flexibility in how they're constructed and executed. It's the perfect balance between structure and flexibility.
The power of Orchestrator lies in its ability to make complex business workflows simple to define, easy to test, and flexible to modify. Whether you're processing orders, onboarding customers, or handling document workflows, Orchestrator provides the structure and flexibility you need to build robust, maintainable business processes.
Last updated
Was this helpful?