Links

Testing Asynchronous Messaging

Testing asynchronous communication in PHP
When your code becomes asynchronous, sending Command is not enough to verify full flow. Your message will land in pollable message channel (queue) awaiting for consumption. This requires executing your consumer in order to test full flow.
Ecotone provides full support for testing your asynchronous messaging architecture.

Executing asynchronous consumer

As an example, let's imagine scenario, where after placing order we want to send notification asynchronously.
class NotificationService
{
#[Asynchronous('notifications')]
#[EventHandler(endpointId: 'notifyOrderWasPlaced')]
public function notify(OrderWasPlaced $event, Notifier $notifier): void
{
$notifier->notifyAbout('placedOrder', $event->getOrderId());
}
}

Ecotone Lite Setup

Let's start by setting up Ecotone Lite for asynchronous scenario. We will use Extension Objects to do this.
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
[OrderService::class, NotificationService::class, ChannelConfiguration::class],
[new OrderService(), new NotificationService()],
ServiceConfiguration::createWithDefaults()
->withExtensionObjects([
// 1. This will set up Polling instructions for notifications consumer
PollingMetadata::create('notifications')
->withTestingSetup(
// consumer will stop after handling single message
amountOfMessagesToHandle: 1,
// or consumer will stop after 100 ms
maxExecutionTimeInMilliseconds: 100,
// or consumer will stop immediately after error
failAtError: true
)
]),
);
// you could run Event Bus with OrderWasPlaced here instead
$ecotoneTestSupport->getCommandBus()->sendWithRouting('order.register', new PlaceOrder('123'));
// 2. running consumer
$ecotoneTestSupport->run('notifications');
$this->assertEquals(
1,
// 3. we can provide some in memory implementation for testing purposes
count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. 1.
    PollingMetadata - Is a set up for consumer how it should be running. For testing purposes, we want to limit the execution, so it can end as fast as it can, in order to provide meaningful details. You can read more here.
  2. 2.
    Run - This runs the the consumer with given PollingMetadata
  3. 3.
    Assert - We assert the state after consumer has exited
In above example we are running consumer within same process as test. You may run consumer from separate process like this: (example for symfony): php bin/console ecotone:run notifications --handledMessageLimit=1 --executionTimeLimit=100 --stopOnFailure However running consumer as separate process is not advised, as it requires booting separate processwhich slows test suite, and due to lack ofshared memory does not allow for using In Memory implementations.

Running your asynchronous channels in Memory

In above example we were running running pollable channel, that could be used with RabbitMQ for example. Having tests for real channel implementations can increase our confidence, however requires having Message Broker Instance available and they may slow your test suite.
Ecotone provides In Memory Pollable Channels which can replace real implementation for testing purposes.
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
PollingMetadata::create('notifications')
->withTestingSetup(),
// 2. We register In Memory Pollable Channel
SimpleMessageChannelBuilder::createQueueChannel('notifications')
]),
);
$ecotoneTestSupport->getCommandBus()->sendWithRouting('order.register', new PlaceOrder('123'));
$ecotoneTestSupport->run('notifications');
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. 1.
    Previously we had ChannelConfiguration::class defined, that could lead to register RabbitMQ channel. We drop from test case now.
  2. 2.
    We register under notifications new In Memory Pollable Channel.

Spying on channels and testing serialization

We may want to intercept the flow and spy given channel to know if message was sent or if it contains given payload or headers.
In our previous example, instead of calling notifier, we could simply check, if given message have landed in notifications channel.
Let's make this example a bit more spicy and besides verifying, if the message is in this channel, we will also check, if our serialization to json have worked.
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
PollingMetadata::create('notifications')
->withTestingSetup(),
SimpleMessageChannelBuilder::createQueueChannel('notifications'),
// 1. Testing Configuration support
TestConfiguration::createWithDefaults()
// 2. Enable conversion on given channel
->withMediaTypeConversion('notifications', MediaType::createApplicationJson())
// 3. Choose channel to spy.
->withSpyOnChannel('notifications'),
]),
);
$ecotoneTestSupport->getCommandBus()->sendWithRouting('order.register', new PlaceOrder('123'));
$this->assertEquals(
['{"orderId":"123"}'],
// 4. Verifing serialization - Get Events from spied channel
$ecotoneTestSupport->getMessagingTestSupport()->getSpiedChannelRecordedMessagePayloads('notifications')
);
$ecotoneTestSupport->run('notifications');
// 5. Verifying deserializaton
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. 1.
    We are making use of TestConfiguration support class
  2. 2.
    Even so we are still using In Memory Channel, we can enable serialization on this channel to given Media Type. In this case, we say serialize to json all message going through notifications.
  3. 3.
    We enable spying on notifications channel, which will allow us to fetch message that went through this channel.
  4. 4.
    We get and verify messages sent to notifications channel, if their were sent in json format.
  5. 5.
    We know that message sent to the channel was in json format. So by calling consumer and verifying if notification was sent, we have also verified deserialization of the event.

Running your code synchronously

Pollable channels creates Pollable consumers, which means the code will be executed asynchronously after consuming message. The second option is so called Event-Driven consumer, which is default one and means code is trigger synchronously (imagine synchronous Event/Command Handler).
Thanks Ecotone's flexibility, we can switch from synchronous to asynchronous code simply by adding Asynchronous attribute. In tests then, we can ignore asynchronous module, to make the code work synchronous again.
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
);
$ecotoneTestSupport->getCommandBus()->sendWithRouting('order.register', new PlaceOrder('123'));
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. 1.
    We are using Ecotone's option to turn off given modules. To turn of asynchronous channels, we disable ModulePackageList::ASYNCHRONOUS_PACKAGE. This way our code starts to work like it would be fully synchronous.

Dealing with delayed messages

Our Handlers may be delayed in time and we may want to test, if they released after given period of time correctly or perform the release after set of making set of actions.
class NotificationService
{
#[Asynchronous('notifications')]
#[Delayed(1000 * 60)] // 60 seconds
#[EventHandler(endpointId: 'notifyOrderWasPlaced')]
public function notify(OrderWasPlaced $event, Notifier $notifier): void
{
$notifier->notifyAbout('placedOrder', $event->getOrderId());
}
}
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
PollingMetadata::create('notifications')
->withTestingSetup(),
// 1. Turn on Delayable In Memory Pollable Channel
SimpleMessageChannelBuilder::createQueueChannel('notifications', true)
]),
);
$ecotoneTestSupport->getCommandBus()->sendWithRouting('order.register', new PlaceOrder('123'));
$ecotoneTestSupport->run('notifications');
$this->assertEquals(
0,
count($this->notifier->getNotificationsOf('placedOrder'))
);
// 2. Releasing messages awaiting for 60 seconds
$ecotoneTestSupport->getMessagingTestSupport()->releaseMessagesAwaitingFor(
'orders',
1000 * 60
);
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. 1.
    The default behaviour for In Memory Channels is to ignore delays. By setting second parameter to true we are registering In Memory Channel that will be aware of delays.
  2. 2.
    We are releasing messages that awaits for 60 seconds or less.

Dropping all messages coming to given channel

In some scenarios, you may just want to turn off given channel, because you're not interested in messages that goes through it.
$ecotoneTestSupport = EcotoneLite::bootstrapForTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
ServiceConfiguration::createWithDefaults()
->withExtensionObjects([
// 1. Create nullable channel
SimpleMessageChannelBuilder::createNullableChannel('notifications')
]),
);
  1. 1.
    By registering nullable channel, we make use that all messages that will go to given channel will be dropped.