Message Consumer and Publisher
Message Consumer
Message Consumer is implementation that allow to fetch messages by simply annotating method with attribute.
namespace Test\Ecotone\SqsDemo\Fixture\SqsConsumer;
final class SqsConsumerExample
{
/** @var string[] */
private array $messagePayloads = [];
// 1. Message Consumer
#[MessageConsumer('sqs_consumer')]
public function collect(string $payload): void
{
$this->messagePayloads[] = $payload;
}
/**
* @return string[]
*/
#[QueryHandler('consumer.getMessagePayloads')]
public function getMessagePayloads(): array
{
return $this->messagePayloads;
}
}Message Consumer- By annotation with Attribute we tell Ecotone to activate this method to be used as Message Consumer.sqs_consumerwill be the endpoint it, so we will be running our consumer by this name.
As Message Consumer can be connected to any external broker, we also need to know that it's supposed to be used with SQS.
To do this, we will introduce Extension Object, that will tell us how to configure this Message Consumer.
And now we can create Module that will make use of this configuration in order to register SQS Inbound Channel Adapter.
Module Annotation- This tell Ecotone to use this class as ModuleCreating Module- In here we may look for all classes containing given attribute, if there is a need.Preparing Module- In here we can adjust configuration using our Module. In our scenario we are usingMessage Consumer Configurationto registerSQS Inbound Channel AdapterinMessaging Configuration.We tell Ecotone what types of
Extension Objectsthis Module is looking for.
Before we will test this, we need a way to send Message to SQS Queue first. In order to do so, we need to set up Message Publisher.
We have register Inbound Channel Adapter, however we have not connected it anyhow SqsConsumerExample.
This is the same for all types of Message Consumers, so there is a separate module for that MessageConsumerModule.
This module connect this class to channel named by endpoint id.
Our inbound Channel Adapter (3rd parameter), we are using endpoint id, as request channel after fetching message from the queue. Thanks to that it's all connected.
Message Publisher
Message Publisher is Gateway implementation that allow us to create abstraction that will send messages to SQS Queue via simple interface.
First let's create Sqs Publisher Configuration:
and then we can create Module, that will use of this config in order to register our Sqs Outbound Channel Adapter.
Extension Object Resolver- This is a helper class that allows use to fetch from extension object, concrete class or classes of given type.Registering Messaging Gateways- Messaging Gateways are entrypoints to messaging system asMessage Publisheris an interface that will be called by end-user, this means it's a Messaging Gateway. We register all method that this interface has together with parameter convertersRegistering Outbound Channel Adapter- Using ourMessage Publisher Configurationwe register ourSqs Outbound Channel Adapter. The input channel name is the same as request channel name of Gateway. This way, when interface's method will be called under the hood we will call our Outbound Channel Adapter.
Testing Message Publisher and Consumer
Our test case can look like this:
We are using Message Publisher to send the message and then we are consuming it using our Message Consumer.
Summary
We have introduced Message Channel and Message Consumer and Publisher.
However the the hood we had chance to get known with Messaging Gateways, Inbound and Outbound Channel Adapters and Message Handlers.
Using few patterns you may actually build really customized Messaging Configurations. This is power of decoupled system, when you get familiar with them, they open much more possibilities.
Last updated
Was this helpful?