Message Channel

To understand the use case behind Message Channels read Asynchronous Processing section for Application level processing and Distributed Bus section for cross application communication.

Message Channel

To create AMQP Backed Message Channel (RabbitMQ Channel), we need to create Service Context.

class MessagingConfiguration
{
    #[ServiceContext] 
    public function orderChannel()
    {
        return AmqpBackedMessageChannelBuilder::create("orders");
    }
}

Now orders channel will be available in our Messaging System.

Message Channel Configuration

AmqpBackedMessageChannelBuilder::create("orders")
    ->withAutoDeclare(false) // do not auto declare queue
    ->withDefaultTimeToLive(1000) // limit TTL of messages
    ->withDefaultDeliveryDelay(1000) // delay messages by default
    ->withFinalFailureStrategy(FinalFailureStrategy:RESEND) // final failure strategy

Customize Queue Name

By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:

Usage

Then Message Channels can be used as follows to make Message Handler asynchronous:

RabbitMQ Streaming Channel

RabbitMQ Streams provide persistent event streaming capabilities. Unlike traditional queues where each message is consumed by a single consumer, streaming channels allow multiple independent consumers to read from the same stream, each tracking their own position.

Requirements

To use RabbitMQ Streaming Channels, you need to install:

  1. Ecotone DBAL Module - Required for storing consumer position tracking

  1. AmqpLib Connection Factory - Required for RabbitMQ streaming support (see Connection Factory Setup below)

The DBAL module is used to persist the consumer position (offset) in the database. This ensures that if your application restarts, consumers can resume from where they left off.

Key Features

  • Multiple Independent Consumers: Each consumer maintains its own position in the stream

  • Event Replay: Consumers can start from any position (first, last, next, or specific offset)

  • Durability: Events are persisted and can be consumed multiple times

  • Position Tracking: Automatic tracking of consumer position with configurable commit intervals

Basic Configuration

To create a RabbitMQ Streaming Channel, you need to:

  1. Create a stream queue using AmqpQueue::createStreamQueue()

  2. Configure the streaming channel with AmqpStreamChannelBuilder

Connection Factory Setup

Make sure you're using the AmqpLib connection factory in your dependency container:

Start Position Options

The startPosition parameter controls where the consumer begins reading from the stream:

  • "first" - Start from the beginning of the stream (replay all events)

  • "last" - Start from the end of the stream (skip existing events)

  • "next" - Start from the next new message (default behavior)

  • Specific offset - Start from a specific offset number (e.g., "12345")

Message Group ID (Consumer Groups)

The messageGroupId is a unique identifier for each consumer group. Multiple consumers with the same messageGroupId will share the same position in the stream, while consumers with different IDs track their positions independently.

Both services consume from the same stream (events_stream) but track their positions independently.

Advanced Configuration

Commit Interval

Controls how often the consumer position is committed. Lower values are safer but slower, higher values improve performance but may cause reprocessing on failure.

How it works:

  • commitInterval=1: Commit after every message (safest, slowest)

  • commitInterval=100: Commit after every 100 messages (better performance)

  • The last message in a batch is always committed, even if the interval hasn't been reached

Prefetch Count

Controls how many unacknowledged messages RabbitMQ will deliver to the consumer.

Guidelines:

  • Lower values (e.g., 1-10): Better flow control, lower throughput

  • Higher values (e.g., 50-100): Higher throughput, more memory usage

Complete Example

Here's a complete example showing how to set up a streaming channel for distributed event sharing:

Usage with Distributed Bus

Streaming channels work seamlessly with the Distributed Bus:

For more examples of using streaming channels with Distributed Bus, see the Distributed Bus with Service Map documentation.

Last updated

Was this helpful?