RabbitMQ Support
Asynchronous PHP RabbitMQ

Installation

1
composer require ecotone/amqp
Copied!

Module Powered By

Enqueue solid and powerful abstraction over asynchronous queues.

Configuration

In order to use AMQP Support we need to add ConnectionFactory to our Dependency Container.
Symfony
Laravel
Lite
1
# config/services.yaml
2
# You need to have RabbitMQ instance running on your localhost, or change DSN
3
Enqueue\AmqpExt\AmqpConnectionFactory:
4
class: Enqueue\AmqpExt\AmqpConnectionFactory
5
arguments:
6
- "amqp://guest:[email protected]:5672//"
Copied!
1
# Register AMQP Service in Provider
2
3
use Enqueue\AmqpExt\AmqpConnectionFactory;
4
5
public function register()
6
{
7
$this->app->singleton(AmqpConnectionFactory::class, function () {
8
return new AmqpConnectionFactory("amqp+lib://guest:[email protected]:5672//");
9
});
10
}
Copied!
1
use Enqueue\AmqpExt\AmqpConnectionFactory;
2
3
$application = EcotoneLiteApplication::boostrap(
4
[
5
AmqpConnectionFactory::class => new AmqpConnectionFactory("amqp+lib://guest:[email protected]:5672//")
6
]
7
);
Copied!
We register our AmqpConnection under the class name Enqueue\AmqpExt\AmqpConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.

Message Channel

To create AMQP Backed Message Channel (RabbitMQ Channel), we need to create Service Context.
1
class MessagingConfiguration
2
{
3
#[ServiceContext]
4
public function orderChannel()
5
{
6
return AmqpBackedMessageChannelBuilder::create("orders");
7
}
8
}
Copied!
Now orders channel will be available in our Messaging System.

Distributed Publisher and Consumer

Distributed Publisher

1
class MessagingConfiguration
2
{
3
#[ServiceContext]
4
public function distributedPublisher()
5
{
6
return AmqpDistributedBusConfiguration::createPublisher();
7
}
8
}
Copied!

Distributed Consumer

1
class MessagingConfiguration
2
{
3
#[ServiceContext]
4
public function distributedConsumer()
5
{
6
return AmqpDistributedBusConfiguration::createConsumer();
7
}
8
}
Copied!

Custom Message Publisher

Available actions

1
interface MessagePublisher
2
{
3
4
// 1
5
public function send(string $data, string $sourceMediaType = MediaType::TEXT_PLAIN) : void;
6
7
// 2
8
public function sendWithMetadata(string $data, array $metadata, string $sourceMediaType = MediaType::TEXT_PLAIN) : void;
9
10
// 3
11
public function convertAndSend(object|array $data) : void;
12
13
14
// 4
15
public function convertAndSendWithMetadata(object|array $data, array $metadata) : void;
16
}
Copied!
  1. 1.
    send - Send a string type via Publisher. It does not need any conversion, you may add additional Media Type of $data.
  2. 2.
    sendWithMetadata - Does the same as send, allows for sending additional Meta data.
  3. 3.
    convertAndSend - Allow for sending types, which needs conversion. Allow for sending objects and array, Ecotone make use of Conversion system to convert $data.
  4. 4.
    convertAndSendWithMetadata - Does the same as convertAndSend, allow for sending additional Meta data.

Configuration

If you want to publish Message directly to Exchange, you may use of Publisher.
1
class AMQPConfiguration
2
{
3
#[ServiceContext]
4
public function registerAmqpConfig()
5
{
6
return
7
AmqpMessagePublisherConfiguration::create(
8
MessagePublisher::class, // 1
9
"delivery", // 2
10
"application/json" // 3
11
);
12
}
13
}
Copied!
  1. 1.
    Reference name - Name under which it will be available in Dependency Container.
  2. 2.
    Exchange name - Name of exchange where Message should be publisher
  3. 3.
    Default Conversion [Optional] - Default type, payload will be converted to.
Publisher is a special type of Gateway, which implements Publisher interface. It will be available in your Dependency Container under passed Reference name. In case interface name MessagePublisher:class is used, it will be available using auto-wire.
1
#[EventHandler]
2
public function whenOrderWasPlaced(OrderWasPlaced $event, MessagePublisher $publisher) : void
3
{
4
$publisher->convertAndSendWithMetadata(
5
$event,
6
[
7
"system.executor" => "Johny"
8
]
9
);
10
}
Copied!

Additional Publisher Configuration

1
AmqpMessagePublisherConfiguration::create(
2
MessagePublisher::class,
3
"delivery"
4
)
5
->withDefaultPersistentDelivery(true) // 1
6
->withDefaultRoutingKey("someKey") // 2
7
->withRoutingKeyFromHeader("routingKey") // 3
8
->withHeaderMapper("application.*") // 4
Copied!
  1. 1.
    withDefaultPersistentDelivery - should AMQP messages be persistent.
  2. 2.
    withDefaultRoutingKey - default routing key added to AMQP message
  3. 3.
    withRoutingKeyFromHeader - should routing key be retrieved from header with name
  4. 4.
    withHeaderMapper - On default headers are not send with AMQP message. You map provide mapping for headers that should be mapped to AMQP Message

Consumer

To connect consumer directly to a AMQP Queue, we need to provide Ecotone with information, how the Queue is configured.
1
class AmqpConfiguration
2
{
3
#[ServiceContext]
4
public function registerAmqpConfig(): array
5
{
6
return [
7
AmqpQueue::createWith("orders"), // 1
8
AmqpExchange::createDirectExchange("system"), // 2
9
AmqpBinding::createFromNames("system", "orders", "placeOrder"), // 3
10
AmqpMessageConsumerConfiguration::create("consumer", "orders") // 4
11
];
12
}
13
}
Copied!
  1. 1.
    AmqpQueue::createWith(string $name) - Registers Queue with specific name
  2. 2.
    AmqpExchange::create*(string $name) - Registers of given type with specific name
  3. 3.
    AmqpBinding::createFromName(string $exchangeName, string $queueName, string $routingKey)- Registering binding between exchange and queue
  4. 4.
    Provides Consumer that will be registered at given name "consumer" and will be polling "orders" queue
When we do have registered configuration, we can register Consumer for specific queue.
1
class Consumer
2
{
3
#[MessageConsumer("consumer")]
4
public function execute(string $message) : void
5
{
6
// do something with Message
7
// if you have converter registered you can type hint exact type you expect
8
}
9
}
Copied!

Publisher Transactions

Ecotone AMQP comes with support for RabbitMQ Transaction for published messages. To enable transactions on specific endpoint, mark it with Ecotone\Amqp\AmqpTransaction\AmqpTransaction annotation.
1
#[AmqpChannelAdapter(endpointId: "amqp_consumer",queueName: "queue_name")
2
#[AmqpTransaction)
3
public function execute(string $message) : void
4
{
5
// do something with Message
6
}
Copied!
If you want to enable/disable for all Asynchronous Endpoints or specific for Command Bus. You may use of ServiceContext.
By default all transactions are enabled
1
class ChannelConfiguration
2
{
3
#[ServiceContext]
4
public function registerTransactions() : array
5
{
6
return [
7
AmqpConfiguration::createWithDefaults()
8
->withTransactionOnAsynchronousEndpoints(false)
9
->withTransactionOnCommandBus(false)
10
];
11
}
12
13
}
Copied!