Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Installation
  • Module Powered By
  • Configuration
  • Message Channel
  • Message Channel Configuration
  • Customize Queue Name
  • AMQP Distributed Bus
  • Message Publisher
  • Publisher Configuration
  • Message Consumer
  • Available Exchange configurations
  • Available Queue configurations
  • Publisher Acknowledgments
  • Publisher Transactions

Was this helpful?

Export as PDF
  1. Modules

RabbitMQ Support

Asynchronous PHP RabbitMQ

PreviousConfigurationNextKafka Support

Last updated 3 months ago

Was this helpful?

Installation

composer require ecotone/amqp

Module Powered By

solid and powerful abstraction over asynchronous queues.

Configuration

In order to use AMQP Support we need to add ConnectionFactory to our Dependency Container.

# config/services.yaml
# You need to have RabbitMQ instance running on your localhost, or change DSN
    Enqueue\AmqpExt\AmqpConnectionFactory:
        class: Enqueue\AmqpExt\AmqpConnectionFactory
        arguments:
            - "amqp://guest:guest@localhost:5672//"
# Register AMQP Service in Provider

use Enqueue\AmqpExt\AmqpConnectionFactory;

public function register()
{
     $this->app->singleton(AmqpConnectionFactory::class, function () {
         return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");
     });
}
use Enqueue\AmqpExt\AmqpConnectionFactory;

$application = EcotoneLiteApplication::boostrap(
    [
        AmqpConnectionFactory::class => new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//")
    ]
);

We register our AmqpConnection under the class name Enqueue\AmqpExt\AmqpConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.

Message Channel

To create AMQP Backed (RabbitMQ Channel), we need to create .

class MessagingConfiguration
{
    #[ServiceContext] 
    public function orderChannel()
    {
        return AmqpBackedMessageChannelBuilder::create("orders");
    }
}

Now orders channel will be available in our Messaging System.

Message Channels simplify to the maximum integration with Message Broker. From application perspective all we need to do, is to provide channel implementation. Ecotone will take care of whole publishing and consuming part.

Message Channel Configuration

AmqpBackedMessageChannelBuilder::create("orders")
    ->withAutoDeclare(false) // do not auto declare queue
    ->withDefaultTimeToLive(1000) // limit TTL of messages
    ->withDefaultDeliveryDelay(1000) // delay messages by default

Customize Queue Name

By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:

#[ServiceContext] 
public function orderChannel()
{
    return AmqpBackedMessageChannelBuilder::create(
        channelName: "orders",
        queueName: "crm_orders"
    );
}

AMQP Distributed Bus

Message Publisher

If you want to publish Message directly to Exchange, you may use of Publisher.

class AMQPConfiguration
{
    #[ServiceContext] 
    public function registerAmqpConfig()
    {
        return 
            AmqpMessagePublisherConfiguration::create(
                MessagePublisher::class, // 1
                "delivery", // 2
                "application/json" // 3
            );
    }
}
  1. Reference name - Name under which it will be available in Dependency Container.

  2. Exchange name - Name of exchange where Message should be published

  3. Default Conversion [Optional] - Default type, payload will be converted to.

Publisher Configuration

AmqpMessagePublisherConfiguration::create(
    MessagePublisher::class,
    "delivery"
)
    ->withDefaultPersistentDelivery(true) // 1
    ->withDefaultRoutingKey("someKey") // 2
    ->withRoutingKeyFromHeader("routingKey") // 3
    ->withHeaderMapper("application.*") // 4
  1. withDefaultPersistentDelivery - should AMQP messages be persistent.

  2. withDefaultRoutingKey - default routing key added to AMQP message

  3. withRoutingKeyFromHeader - should routing key be retrieved from header with name

  4. withHeaderMapper - On default headers are not send with AMQP message. You map provide mapping for headers that should be mapped to AMQP Message

Message Consumer

We can bind given method as Message Consumer

#[MessageConsumer('orders_consumer')] // name of endpoint id
public function handle(string $payload): void
{
    // do something
}

To connect consumer directly to a AMQP Queue, we need to provide Ecotone with information, how the Queue is configured.

class AmqpConfiguration
{
    #[ServiceContext] 
    public function registerAmqpConfig(): array
    {
        return [
            AmqpQueue::createWith("orders"), // 1
            AmqpExchange::createDirectExchange("system"), // 2
            AmqpBinding::createFromNames("system", "orders", "placeOrder"), // 3
            AmqpMessageConsumerConfiguration::create("orders_consumer", "orders") // 4
        ];
    }
}
  1. AmqpQueue::createWith(string $name) - Registers Queue with specific name

  2. AmqpExchange::create*(string $name) - Registers of given type with specific name

  3. AmqpBinding::createFromName(string $exchangeName, string $queueName, string $routingKey)- Registering binding between exchange and queue

  4. Provides Consumer that will be registered at given name "orders_consumer" and will be polling "orders" queue

Available Exchange configurations

$amqpExchange = AmqpExchange::createDirectExchange
$amqpExchange = AmqpExchange::createFanoutExchange
$amqpExchange = AmqpExchange::createTopicExchange
$amqpExchange = AmqpExchange::createHeadersExchange

$amqpExchange = $amqpExchange
    ->withDurability(true) // exchanges survive broker restart
    ->withAutoDeletion() // exchange is deleted when last queue is unbound from it

Available Queue configurations

$amqpQueue = AmqpQueue::createDirectExchange
                ->withDurability(true) // the queue will survive a broker restart
                ->withExclusivity() // used by only one connection and the queue will be deleted when that connection closes
                ->withAutoDeletion() // queue that has had at least one consumer is deleted when last consumer unsubscribes
                ->withDeadLetterExchangeTarget($amqpExchange);

Publisher Acknowledgments

#[ServiceContext]
public function amqpChannel() : array
{
    return [
        AmqpBackedMessageChannelBuilder::create("orders")
            ->withPublisherAcknowledgments(false)
    ];
}

Publisher Transactions

Ecotone AMQP comes with support for RabbitMQ Transaction for published messages. This means that, if you send more than one message at time, it will be commited together.

class ChannelConfiguration
{
    #[ServiceContext]
    public function registerTransactions() : array
    {
        return [
            AmqpConfiguration::createWithDefaults()
                ->withTransactionOnAsynchronousEndpoints(false)
                ->withTransactionOnCommandBus(false)
        ];
    }

}

To enable transactions on specific endpoint if default is disabled, mark consumer with Ecotone\Amqp\AmqpTransaction\AmqpTransaction annotation.

    #[AmqpTransaction)]
    #[MessageConsumer("consumer")]
    public function execute(string $message) : void
    {
        // do something with Message
    }

AMQP Distributed Bus is described in more details under .

By default Ecotone will aim for resiliency to avoid Message being lost. This protects from lost heartbeats issue (AMQP bug which make message vanish without exceptions) and ensures that Message are considered delivered only when Broker has acknowledged storing them on the Broker side (Using ). However Publisher confirms comes with time cost, as it makes publishing process awaits for acknowledge from RabbitMQ. Therefore if delivery guarantee is not an issue, and we can accept risk of losing messages we can consider disable it to speed up publishing time:

Publisher acknowledgments can be combined with to ensure high message guarantee.

If you want to enable/disable for all or specific for Command Bus. You may use of ServiceContext.

By default RabbitMQ transactions are disabled, as you may ensure consistency using .

Enqueue
Message Channel
Service Context
Distributed Bus section
Publisher confirms
Outbox
Asynchronous Endpoints
Resilient Sending