Message Channel
To understand the use case behind Message Channels read Asynchronous Processing section for Application level processing and Distributed Bus section for cross application communication.
Message Channel
To create AMQP Backed Message Channel (RabbitMQ Channel), we need to create Service Context.
class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return AmqpBackedMessageChannelBuilder::create("orders");
}
}Now orders channel will be available in our Messaging System.
Message Channels simplify to the maximum integration with Message Broker. From application perspective all we need to do, is to provide channel implementation. Ecotone will take care of whole publishing and consuming part.
Message Channel Configuration
AmqpBackedMessageChannelBuilder::create("orders")
->withAutoDeclare(false) // do not auto declare queue
->withDefaultTimeToLive(1000) // limit TTL of messages
->withDefaultDeliveryDelay(1000) // delay messages by default
->withFinalFailureStrategy(FinalFailureStrategy:RESEND) // final failure strategyCustomize Queue Name
By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:
Usage
Then Message Channels can be used as follows to make Message Handler asynchronous:
RabbitMQ Streaming Channel
RabbitMQ Streams provide persistent event streaming capabilities. Unlike traditional queues where each message is consumed by a single consumer, streaming channels allow multiple independent consumers to read from the same stream, each tracking their own position.
Streaming channels are ideal for event-driven architectures where multiple services need to consume the same events independently, with support for independent consumption rates.
This functionality is available as part of Ecotone Enterprise.
Requirements
To use RabbitMQ Streaming Channels, you need to install:
Ecotone DBAL Module - Required for storing consumer position tracking
AmqpLib Connection Factory - Required for RabbitMQ streaming support (see Connection Factory Setup below)
Key Features
Multiple Independent Consumers: Each consumer maintains its own position in the stream
Event Replay: Consumers can start from any position (first, last, next, or specific offset)
Durability: Events are persisted and can be consumed multiple times
Position Tracking: Automatic tracking of consumer position with configurable commit intervals
Basic Configuration
To create a RabbitMQ Streaming Channel, you need to:
Create a stream queue using
AmqpQueue::createStreamQueue()Configure the streaming channel with
AmqpStreamChannelBuilder
Important: RabbitMQ Streaming Channels require the AmqpLib connection factory (Enqueue\AmqpLib\AmqpConnectionFactory). The AmqpExt connection factory does not support streaming features.
Connection Factory Setup
Make sure you're using the AmqpLib connection factory in your dependency container:
Start Position Options
The startPosition parameter controls where the consumer begins reading from the stream:
"first"- Start from the beginning of the stream (replay all events)"last"- Start from the end of the stream (skip existing events)"next"- Start from the next new message (default behavior)Specific offset - Start from a specific offset number (e.g.,
"12345")
Message Group ID (Consumer Groups)
The messageGroupId is a unique identifier for each consumer group. Multiple consumers with the same messageGroupId will share the same position in the stream, while consumers with different IDs track their positions independently.
Both services consume from the same stream (events_stream) but track their positions independently.
Advanced Configuration
Commit Interval
Controls how often the consumer position is committed. Lower values are safer but slower, higher values improve performance but may cause reprocessing on failure.
How it works:
commitInterval=1: Commit after every message (safest, slowest)commitInterval=100: Commit after every 100 messages (better performance)The last message in a batch is always committed, even if the interval hasn't been reached
Prefetch Count
Controls how many unacknowledged messages RabbitMQ will deliver to the consumer.
Guidelines:
Lower values (e.g., 1-10): Better flow control, lower throughput
Higher values (e.g., 50-100): Higher throughput, more memory usage
Complete Example
Here's a complete example showing how to set up a streaming channel for distributed event sharing:
Usage with Distributed Bus
Streaming channels work seamlessly with the Distributed Bus:
For more examples of using streaming channels with Distributed Bus, see the Distributed Bus with Service Map documentation.
Last updated
Was this helpful?