composer require ecotone/amqp
In order to use AMQP Support
we need to add ConnectionFactory
to our Dependency Container.
# config/services.yaml# You need to have RabbitMQ instance running on your localhost, or change DSNEnqueue\AmqpLib\AmqpConnectionFactory:class: Enqueue\AmqpLib\AmqpConnectionFactoryarguments:- "amqp://guest:guest@localhost:5672//"
# Register AMQP Service in Provider​use Enqueue\AmqpLib\AmqpConnectionFactory;​public function register(){$this->app->singleton(AmqpConnectionFactory::class, function () {return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");});}
We register our AmqpConnection under the class name Enqueue\AmqpLib\AmqpConnectionFactory.
This will help Ecotone resolve it automatically, without any additional configuration.
To create AMQP Backed Channel
(RabbitMQ Channel), we need to create Application Context.
use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;use Ecotone\Messaging\Annotation\ApplicationContext;use Ecotone\Messaging\Annotation\Extension;​/*** @ApplicationContext()*/class MessagingConfiguration{/*** @Extension()*/public function orderChannel(){return AmqpBackedMessageChannelBuilder::create("orders");}}
Now orders
channel will be available in our Messaging System.
namespace Ecotone\Messaging;​use Ecotone\Messaging\Conversion\MediaType;​interface Publisher{​// 1public function send(string $data, string $sourceMediaType = MediaType::TEXT_PLAIN) : void;​// 2public function sendWithMetadata(string $data, array $metadata, string $sourceMediaType = MediaType::TEXT_PLAIN) : void;​// 3/*** @param object|array $data*/public function convertAndSend($data) : void;​​// 4/*** @param object|array $data* @param array $metadata*/public function convertAndSendWithMetadata($data, array $metadata) : void;}
send
- Send a string type
via Publisher. It does not need any conversion, you may add additional Media Type
of $data
.
sendWithMetadata
- Does the same as send,
allows for sending additional Meta data.
convertAndSend
- Allow for sending types, which needs conversion. Allow for sending objects and array, Ecotone
make use of Conversion system to convert $data
.
convertAndSendWithMetadata
- Does the same as convertAndSend,
allow for sending additional Meta data.
If you want to publish Message directly to Exchange, you may use of Publisher.
use Ecotone\Messaging\Publisher;​/*** @ApplicationContext()*/class AMQPConfiguration{/*** @Extension()*/public function registerAmqpConfig(){returnAmqpMessagePublisherConfiguration::create(Publisher::class, // 1"delivery", // 2"application/json" // 3);}}
Reference name
- Name under which it will be available in Dependency Container.
Exchange name
- Name of exchange where Message should be publisher
Default Conversion [Optional]
- Default type, payload will be converted to.
Publisher is a special type of Gateway, which implements Publisher interface.
It will be available in your Dependency Container under passed Reference name.
In case interface name Publisher:class
is used, it will be available using auto-wire.
/*** @EventHandler()*/public function whenOrderWasPlaced(OrderWasPlaced $event, Publisher $publisher) : void{$publisher->convertAndSendWithMetadata($event,["system.executor" => "Johny"]);}
RegisterAmqpPublisher::create(Publisher::class,"delivery")->withDefaultPersistentDelivery(true) // 1->withDefaultRoutingKey("someKey") // 2->withRoutingKeyFromHeader("routingKey") // 3->withHeaderMapper("application.*") // 4
withDefaultPersistentDelivery
- should AMQP messages be persistent
.
withDefaultRoutingKey
- default routing key added to AMQP message
withRoutingKeyFromHeader
- should routing key be retrieved from header with name
withHeaderMapper
- On default headers are not send with AMQP message. You map provide mapping for headers that should be mapped to AMQP Message
To connect consumer directly to a AMQP Queue, we need to provide Ecotone
with information, how the Queue is configured.
use Ecotone\Amqp\AmqpBinding;use Ecotone\Amqp\AmqpExchange;use Ecotone\Amqp\AmqpQueue;​/*** @ApplicationContext()*/class AmqpConfiguration{/*** @Extension()*/public function registerAmqpConfig(): array{return [AmqpQueue::createWith("orders"), // 1AmqpExchange::createDirectExchange("system"), // 2AmqpBinding::createFromNames("system", "orders", "placeOrder"), // 3];}}
AmqpQueue::createWith(string $name)
- Registers Queue with specific name
AmqpExchange::create*(string $name)
- Registers of given type with specific name
AmqpBinding::createFromName(string $exchangeName, string $queueName, string $routingKey)
- Registering binding between exchange and queue
When we do have registered configuration, we can register Consumer for specific queue.
/*** @MessageEndpoint()*/class Consumer{/*** @AmqpChannelAdapter(* endpointId="amqp_consumer", // 1* queueName="queue_name", // 2* headerMapper="application.*" // 3* )*/public function execute(string $message) : void{// do something with Message// if you have converter registered you can type hint exact type you expect}}
endpointId
- Defines identifier for this consumer. It will be available under this name to run
ecotone:list-all-asynchronous-endpoints+--------------------+| Endpoint Names |+--------------------+| amqp_consumer |+--------------------+
2. queueName
- The queue name which will be polled by this consumer
3. headerMapper
- Headers which should be mapped to Message
Ecotone AMQP
comes with support for RabbitMQ Transaction for published messages.
To enable transactions on specific endpoint, mark it with Ecotone\Amqp\AmqpTransaction\AmqpTransaction
annotation.
/*** @AmqpChannelAdapter(endpointId="amqp_consumer",queueName="queue_name")* @AmqpTransaction()*/public function execute(string $message) : void{// do something with Message}
If you want to enable for all Asynchronous Endpoints or specific for Command Bus. You may use of ApplicationContext.
use Ecotone\Amqp\Configuration\AmqpConfiguration;use Ecotone\Messaging\Annotation\ApplicationContext;use Ecotone\Messaging\Annotation\Extension;​/*** @ApplicationContext()*/class ChannelConfiguration{/*** @Extension()*/public function registerTransactions() : array{return [AmqpConfiguration::createWithDefaults()->withTransactionOnAsynchronousEndpoints(true)->withTransactionOnCommandBus(true)];}​}
Examples can be find here.