RabbitMQ Support
Asynchronous PHP RabbitMQ
Installation
composer require ecotone/amqp
Module Powered By
Enqueue solid and powerful abstraction over asynchronous queues.
Configuration
In order to use AMQP Support
we need to add ConnectionFactory
to our Dependency Container.
# config/services.yaml
# You need to have RabbitMQ instance running on your localhost, or change DSN
Enqueue\AmqpExt\AmqpConnectionFactory:
class: Enqueue\AmqpExt\AmqpConnectionFactory
arguments:
- "amqp://guest:guest@localhost:5672//"
Message Channel
To create AMQP Backed Message Channel (RabbitMQ Channel), we need to create Service Context.
class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return AmqpBackedMessageChannelBuilder::create("orders");
}
}
Now orders
channel will be available in our Messaging System.
Message Channels simplify to the maximum integration with Message Broker. From application perspective all we need to do, is to provide channel implementation. Ecotone will take care of whole publishing and consuming part.
Message Channel Configuration
AmqpBackedMessageChannelBuilder::create("orders")
->withAutoDeclare(false) // do not auto declare queue
->withDefaultTimeToLive(1000) // limit TTL of messages
->withDefaultDeliveryDelay(1000) // delay messages by default
Customize Queue Name
By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:
#[ServiceContext]
public function orderChannel()
{
return AmqpBackedMessageChannelBuilder::create(
channelName: "orders",
queueName: "crm_orders"
);
}
AMQP Distributed Bus
AMQP Distributed Bus is described in more details under Distributed Bus section.
Message Publisher
If you want to publish Message directly to Exchange, you may use of Publisher.
class AMQPConfiguration
{
#[ServiceContext]
public function registerAmqpConfig()
{
return
AmqpMessagePublisherConfiguration::create(
MessagePublisher::class, // 1
"delivery", // 2
"application/json" // 3
);
}
}
Reference name
- Name under which it will be available in Dependency Container.Exchange name
- Name of exchange where Message should be publishedDefault Conversion [Optional]
- Default type, payload will be converted to.
Publisher Configuration
AmqpMessagePublisherConfiguration::create(
MessagePublisher::class,
"delivery"
)
->withDefaultPersistentDelivery(true) // 1
->withDefaultRoutingKey("someKey") // 2
->withRoutingKeyFromHeader("routingKey") // 3
->withHeaderMapper("application.*") // 4
withDefaultPersistentDelivery
- should AMQP messages bepersistent
.withDefaultRoutingKey
- default routing key added to AMQP messagewithRoutingKeyFromHeader
- should routing key be retrieved from header with namewithHeaderMapper
- On default headers are not send with AMQP message. You map provide mapping for headers that should be mapped toAMQP Message
Message Consumer
We can bind given method as Message Consumer
#[MessageConsumer('orders_consumer')] // name of endpoint id
public function handle(string $payload): void
{
// do something
}
To connect consumer directly to a AMQP Queue, we need to provide Ecotone
with information, how the Queue is configured.
class AmqpConfiguration
{
#[ServiceContext]
public function registerAmqpConfig(): array
{
return [
AmqpQueue::createWith("orders"), // 1
AmqpExchange::createDirectExchange("system"), // 2
AmqpBinding::createFromNames("system", "orders", "placeOrder"), // 3
AmqpMessageConsumerConfiguration::create("orders_consumer", "orders") // 4
];
}
}
AmqpQueue::createWith(string $name)
- Registers Queue with specific nameAmqpExchange::create*(string $name)
- Registers of given type with specific nameAmqpBinding::createFromName(string $exchangeName, string $queueName, string $routingKey)
- Registering binding between exchange and queueProvides Consumer that will be registered at given name
"orders_consumer"
and will be polling"orders"
queue
Available Exchange configurations
$amqpExchange = AmqpExchange::createDirectExchange
$amqpExchange = AmqpExchange::createFanoutExchange
$amqpExchange = AmqpExchange::createTopicExchange
$amqpExchange = AmqpExchange::createHeadersExchange
$amqpExchange = $amqpExchange
->withDurability(true) // exchanges survive broker restart
->withAutoDeletion() // exchange is deleted when last queue is unbound from it
Available Queue configurations
$amqpQueue = AmqpQueue::createDirectExchange
->withDurability(true) // the queue will survive a broker restart
->withExclusivity() // used by only one connection and the queue will be deleted when that connection closes
->withAutoDeletion() // queue that has had at least one consumer is deleted when last consumer unsubscribes
->withDeadLetterExchangeTarget($amqpExchange);
Publisher Acknowledgments
By default Ecotone will aim for resiliency to avoid Message being lost. This protects from lost heartbeats issue (AMQP bug which make message vanish without exceptions) and ensures that Message are considered delivered only when Broker has acknowledged storing them on the Broker side (Using Publisher confirms). However Publisher confirms comes with time cost, as it makes publishing process awaits for acknowledge from RabbitMQ. Therefore if delivery guarantee is not an issue, and we can accept risk of losing messages we can consider disable it to speed up publishing time:
#[ServiceContext]
public function amqpChannel() : array
{
return [
AmqpBackedMessageChannelBuilder::create("orders")
->withPublisherAcknowledgments(false)
];
}
Publisher acknowledgments can be combined with Outbox to ensure high message guarantee.
Publisher Transactions
Ecotone AMQP
comes with support for RabbitMQ Transaction for published messages.
This means that, if you send more than one message at time, it will be commited together.
If you want to enable/disable for all Asynchronous Endpoints or specific for Command Bus. You may use of ServiceContext.
class ChannelConfiguration
{
#[ServiceContext]
public function registerTransactions() : array
{
return [
AmqpConfiguration::createWithDefaults()
->withTransactionOnAsynchronousEndpoints(false)
->withTransactionOnCommandBus(false)
];
}
}
To enable transactions on specific endpoint if default is disabled, mark consumer with Ecotone\Amqp\AmqpTransaction\AmqpTransaction
annotation.
#[AmqpTransaction)]
#[MessageConsumer("consumer")]
public function execute(string $message) : void
{
// do something with Message
}
Last updated
Was this helpful?