Connecting Handlers with Channels
Learn how to connect message handlers using channels to build workflows
Building business workflows is essential for most applications. Whether you need fully automated processes (like image processing pipelines) or human-interactive flows (like document approval), Ecotone makes it simple by connecting message handlers through channels.
Understanding the Flow: From Handler to Handler
The key concept in Ecotone is that every message handler can be connected to other handlers using channels. Think of channels as pipes that carry messages between different parts of your application.
Core Concept: Each message handler has an input channel (where messages come in) and can have an output channel (where results go out). By connecting these channels, you create workflows.
Step 1: Understanding Single Handlers
Let's start with a simple command handler:
#[CommandHandler('place.order')]
public function place(PlaceOrder $command): void
{
// Place the order logic here
}
What happens behind the scenes:
Ecotone creates a channel named
place.order
Your handler listens to this channel
When you use the Command Bus, it sends messages to this channel

Step 2: Connecting Handlers Together
Here's where it gets powerful: any handler can send messages to another handler's channel. This lets you chain handlers together to create workflows.
Let's add order verification before placing the order:

The magic happens with outputChannelName
:
class ProcessOrder
{
#[CommandHandler(
'verify.order',
outputChannelName: 'place.order' // 👈 This connects the handlers!
)]
public function verify(PlaceOrder $command): PlaceOrder
{
// Verify the order
if ($this->isValidOrder($command)) {
return $command; // Pass it to the next handler
}
throw new InvalidOrderException();
}
#[CommandHandler('place.order')]
public function place(PlaceOrder $command): void
{
// Place the order
$this->orderRepository->save($command);
}
}
How the flow works:
You send
PlaceOrder
toverify.order
channelThe
verify()
method processes it and returns the commandEcotone automatically sends the returned command to
place.order
channelThe
place()
method receives and processes it
Pro Tip: You can use the same command class for multiple handlers in a workflow. This eliminates the need to convert between different classes at each step, making your code simpler and easier to follow.
Making Handlers Internal (Private to Workflow)
Problem: With CommandHandler
, anyone can call place.order
directly through the Command Bus, bypassing your verification step!
Solution: Use InternalHandler
to make handlers private to your workflow:
class ProcessOrder
{
#[CommandHandler(
'verify.order',
outputChannelName: 'place.order'
)]
public function verify(PlaceOrder $command): PlaceOrder
{
// verify the order
return $command;
}
#[InternalHandler('place.order')] // 👈 Can't be called directly!
public function place(PlaceOrder $command): void
{
// place the order
$this->orderRepository->save($command);
}
}
What this means:
✅
verify.order
can be called via Command Bus (entry point)❌
place.order
can only be reached through the workflow🔒 This ensures orders are always verified before being placed
Extending Workflows: You can easily add more steps by adding outputChannelName
to any handler. To send messages to multiple handlers, use the Router pattern.
Adding Asynchronous Processing
Sometimes you want parts of your workflow to run asynchronously (in the background). This is perfect for:
Heavy processing that shouldn't block the user
Ensuring messages aren't lost if something goes wrong
Scaling parts of your workflow independently
Example: Keep verification synchronous (fast feedback) but make order placement asynchronous (reliable processing):
class ProcessOrder
{
#[CommandHandler(
'verify.order',
outputChannelName: 'place.order'
)]
public function verify(PlaceOrder $command): PlaceOrder
{
// This runs immediately (synchronous)
if (!$this->isValidOrder($command)) {
throw new InvalidOrderException();
}
return $command;
}
#[Asynchronous('async')] // 👈 This makes it asynchronous!
#[InternalHandler('place.order')]
public function place(PlaceOrder $command): void
{
// This runs in the background (asynchronous)
$this->orderRepository->save($command);
$this->emailService->sendConfirmation($command);
}
}
What happens now:
verify()
runs immediately and returns a responseThe message goes to a queue/background processor
place()
runs later in the backgroundIf
place()
fails, the message can be retried

The Beauty of Ecotone: You've built a complete workflow with asynchronous processing without extending any framework classes or complex configuration. Everything is declared through simple attributes!
Adding Delays and Timeouts
Asynchronous handlers can also be delayed, which is perfect for business scenarios like:
Giving customers time to complete actions
Implementing timeout behaviors
Scheduling follow-up actions
Example: Give customers 24 hours to pay, then automatically cancel unpaid orders:

Strategy: Use events to trigger delayed actions (events allow multiple handlers to react):
class ProcessOrder
{
// ... previous methods ...
#[Asynchronous('async')]
#[InternalHandler('place.order')]
public function place(PlaceOrder $command, EventBus $eventBus): void
{
// Place the order
$this->orderRepository->save($command);
// Trigger the timeout mechanism
$eventBus->publish(new OrderWasPlaced($command->orderId));
}
}
Now add the delayed cancellation handler:
class OrderTimeoutHandler
{
#[Delayed(new TimeSpan(days: 1))] // 👈 Wait 24 hours
#[Asynchronous('async')]
#[EventHandler]
public function cancelUnpaidOrder(
OrderWasPlaced $event,
OrderRepository $orderRepository
): void {
$order = $orderRepository->get($event->orderId);
if ($order->isNotPaid()) {
$order->cancel();
$orderRepository->save($order);
// Could trigger more events here (email notifications, etc.)
}
}
}
Timeline:
⏰ T+0: Order placed, event published
⏰ T+24h: Cancellation handler runs automatically
Controlling Workflow Flow
Stopping the Workflow
You can stop a workflow from continuing by returning null
:
class ProcessOrder
{
#[CommandHandler(
'verify.order',
outputChannelName: 'place.order'
)]
public function verify(PlaceOrder $command): ?PlaceOrder
{
if (!$this->isValidOrder($command)) {
// Log the issue, send notification, etc.
$this->logger->warning('Invalid order rejected', ['orderId' => $command->orderId]);
// Stop here - don't continue to place.order
return null;
}
// Continue the workflow
return $command;
}
// This won't be called if verify() returns null
#[InternalHandler('place.order')]
public function place(PlaceOrder $command): void
{
$this->orderRepository->save($command);
}
}
Enriching Messages in Workflows
Sometimes you need to add information as messages flow through your workflow. There are two approaches:
Option 1: Transform the Payload
Return a new/modified object that contains additional data:
class CreditCardProcessor
{
#[InternalHandler(
inputChannelName: 'credit_card.add_details',
outputChannelName: 'credit_card.verify'
)]
public function addDetails(CustomerDetails $customer): CustomerDetailsWithHistory
{
// Fetch additional data
$history = $this->creditHistoryService->getHistory($customer->id);
// Return enriched object
return new CustomerDetailsWithHistory($customer, $history);
}
#[InternalHandler('credit_card.verify')]
public function verify(CustomerDetailsWithHistory $enrichedCustomer): void
{
// Now we have both customer details and history
if ($enrichedCustomer->history->isGoodCredit()) {
$this->approveCard($enrichedCustomer->customer);
}
}
}
When to use: When the additional data is core to the next step's logic.
Option 2: Add Data to Message Headers
Keep the original payload unchanged and add extra data as headers:
class CreditCardProcessor
{
#[InternalHandler(
inputChannelName: 'credit_card.add_details',
outputChannelName: 'credit_card.verify',
changingHeaders: true // 👈 This tells Ecotone we're modifying headers
)]
public function addDetails(CustomerDetails $customer): array
{
// Fetch additional data
$history = $this->creditHistoryService->getHistory($customer->id);
// Return array that becomes headers
return [
'creditHistory' => $history,
'riskScore' => $this->calculateRisk($history)
];
}
#[InternalHandler('credit_card.verify')]
public function verify(
CustomerDetails $customer, // Original payload unchanged
#[Header('creditHistory')] CreditHistory $history, // From headers
#[Header('riskScore')] int $riskScore
): void {
// Use both original data and enriched headers
if ($riskScore < 50 && $history->hasGoodPaymentRecord()) {
$this->approveCard($customer);
}
}
}
When to use: When you want to keep the original payload intact and add supplementary data.
Testing Your Workflows with Ecotone Lite
Testing workflows is crucial for ensuring your business logic works correctly. Ecotone Lite makes testing handler chains simple and straightforward.
Setting Up Tests
use Ecotone\Lite\EcotoneLite;
use PHPUnit\Framework\TestCase;
class WorkflowTest extends TestCase
{
public function test_order_verification_workflow(): void
{
// Arrange - Set up your handlers
$orderProcessor = new ProcessOrder();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ProcessOrder::class], // Classes to register
[$orderProcessor], // Service instances
);
// Act - Send a command to start the workflow
$command = new PlaceOrder('order-123', 'customer-456');
$result = $ecotoneLite->sendCommand($command);
// Assert - Verify the workflow executed correctly
$this->assertNull($result); // Void return from final handler
// Verify side effects (database changes, sent emails, etc.)
$this->assertTrue($orderProcessor->wasOrderPlaced('order-123'));
}
}
Testing Handler Chains
Test complete workflows from start to finish:
class ProcessOrder
{
private array $placedOrders = [];
private array $verifiedOrders = [];
#[CommandHandler(
'verify.order',
outputChannelName: 'place.order'
)]
public function verify(PlaceOrder $command): PlaceOrder
{
$this->verifiedOrders[] = $command->orderId;
if ($command->orderId === 'invalid-order') {
throw new InvalidOrderException('Order validation failed');
}
return $command;
}
#[InternalHandler('place.order')]
public function place(PlaceOrder $command): void
{
$this->placedOrders[] = $command->orderId;
}
// In production application you would most likely have some repository
public function wasOrderVerified(string $orderId): bool
{
return in_array($orderId, $this->verifiedOrders);
}
public function wasOrderPlaced(string $orderId): bool
{
return in_array($orderId, $this->placedOrders);
}
}
class WorkflowChainTest extends TestCase
{
public function test_successful_order_workflow(): void
{
$processor = new ProcessOrder();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ProcessOrder::class],
[$processor]
);
// Send to the first step of the workflow
$command = new PlaceOrder('order-123', 'customer-456');
$ecotoneLite->sendDirectToChannel('verify.order', $command);
// Verify both steps executed
$this->assertTrue($processor->wasOrderVerified('order-123'));
$this->assertTrue($processor->wasOrderPlaced('order-123'));
}
public function test_workflow_stops_on_validation_error(): void
{
$processor = new ProcessOrder();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[ProcessOrder::class],
[$processor]
);
// This should fail validation and not reach the place step
$this->expectException(InvalidOrderException::class);
$command = new PlaceOrder('invalid-order', 'customer-456');
$ecotoneLite->sendDirectToChannel('verify.order', $command);
// Verify verification was attempted but placement was not
$this->assertTrue($processor->wasOrderVerified('invalid-order'));
$this->assertFalse($processor->wasOrderPlaced('invalid-order'));
}
}
Testing Asynchronous Workflows
Test async workflows in synchronous mode for easier testing:
class AsyncProcessor
{
private array $processedItems = [];
#[CommandHandler(
'start.async',
outputChannelName: 'async.process'
)]
public function start(ProcessItem $item): ProcessItem
{
return $item;
}
#[Asynchronous('async')]
#[InternalHandler('async.process')]
public function processAsync(ProcessItem $item): void
{
$this->processedItems[] = $item->id;
}
public function getProcessedItems(): array
{
return $this->processedItems;
}
}
public function test_async_workflow_in_sync_mode(): void
{
$processor = new AsyncProcessor();
$ecotoneLite = EcotoneLite::bootstrapFlowTesting(
[AsyncProcessor::class],
[$processor],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::CORE_PACKAGE]))
// This makes async handlers run synchronously for testing
->withDefaultSerializationMediaType(MediaType::APPLICATION_X_PHP)
);
$item = new ProcessItem(id: 'item-123');
$ecotoneLite->sendCommand($item);
// Verify async processing completed
$this->assertContains('item-123', $processor->getProcessedItems());
}
Summary: What You've Learned
You now understand the fundamentals of connecting handlers with channels in Ecotone:
Key Concepts
Channels: Every handler has an input channel, and can send to output channels
Connection: Use
outputChannelName
to chain handlers togetherPrivacy: Use
InternalHandler
to make handlers private to workflowsAsync Processing: Add
#[Asynchronous]
for background processingDelays: Use
#[Delayed]
for time-based workflowsFlow Control: Return
null
to stop workflowsData Enrichment: Transform payloads or add headers
Last updated
Was this helpful?