Testing Asynchronous Messaging
Testing asynchronous communication in PHP
When your code becomes asynchronous, sending Command is not enough to verify full flow.
Your message will land in pollable message channel (
queue
) awaiting for consumption.
This requires executing your consumer in order to test full flow. Ecotone
provides full support for testing your asynchronous messaging architecture
.As an example, let's imagine scenario, where after placing order we want to send notification asynchronously.
class NotificationService
{
#[Asynchronous('notifications')]
#[EventHandler(endpointId: 'notifyOrderWasPlaced')]
public function notify(OrderWasPlaced $event, Notifier $notifier): void
{
$notifier->notifyAbout('placedOrder', $event->getOrderId());
}
}
By default all the asynchronous code will run synchronously. This simplifies the tested code and speed ups your test suite.
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()]
);
// this will publish OrderWasPlaced as a result
$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
Ecotone provides
In Memory Pollable Channels
which can replace real implementation for testing purposes.$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
// 1. we need to provide Message Channel to use
enableAsynchronousProcessing: [
SimpleMessageChannelBuilder::create('notifications')
]
);
// you could run Event Bus with OrderWasPlaced here instead
$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));
// 2. running consumer
$ecotoneTestSupport->run('notifications');
$this->assertEquals(
1,
// 3. we can provide some in memory implementation for testing purposes
count($this->notifier->getNotificationsOf('placedOrder'))
);
- 1.Enable asynchronous processing - We enable asynchronous processing and provide Message Channel to poll from. Message Channel can be Real (SQS, RabbitMQ, Dbal etc) or In Memory one
- 2.Run - This runs the the consumer with given PollingMetadata
- 3.Assert - We assert the state after consumer has exited
In above example we are running consumer within same process as test.
You may run consumer from separate process like this: (example for symfony):
php bin/console ecotone:run notifications --handledMessageLimit=1 --executionTimeLimit=100 --stopOnFailure
However running consumer as separate process is not advised, as it requires booting separate process
which slows test suite, and due to lack ofshared memory
does not allow for using In Memory implementations.
By default Message Consumer will be running up to
100ms
or will stop on error
in test mode.
You may customize the behaviour by providing ExecutionPollingMetadata
.$ecotoneTestSupport->run(
'notifications',
ExecutionPollingMetadata::createWithTestingSetup(
// consumer will stop after handling single message
amountOfMessagesToHandle: 1,
// or consumer will stop after 100 ms
maxExecutionTimeInMilliseconds: 100,
// or consumer will stop immediately after error
failAtError: true
)
);
To test serialization we may fetch Message directly from the Channel and verify it's payload.
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
enableAsynchronousProcessing: [
// 1. Enable conversion on given channel
SimpleMessageChannelBuilder::createQueueChannel(
'notifications',
conversionMediaType: 'application/json'
)
]
);
$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));
$this->assertEquals(
['{"orderId":"123"}'],
// 4. Verifing serialization - Get Event's payload from channel
$ecotoneTestSupport->getMessageChannel('notifications')->receive()->getPayload()
);
- 1.We can enable serialization on this channel for given
Media Type
. In this case, we say serialize tojson
all message going throughnotifications
. - 2.We pull and verify messages sent to
notifications
channel, if their were sent injson
format
By default In Memory Queue Channel will do the serialization to PHP native serialization or your default Serialization if defined. This way it works in similar way to your production Queue Channels.
If you don't want to use serialization however, you may set type to
conversionMediaType: MediaType::createApplicationXPHP()
Our Handlers may be delayed in time and we may want to run peform few actions and then release the message, to verify production like flow.
class NotificationService
{
#[Asynchronous('notifications')]
#[Delayed(1000 * 60)] // 60 seconds
#[EventHandler(endpointId: 'notifyOrderWasPlaced')]
public function notify(OrderWasPlaced $event, Notifier $notifier): void
{
$notifier->notifyAbout('placedOrder', $event->getOrderId());
}
}
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
enableAsynchronousProcessing: [
// 1. Turn on Delayable In Memory Pollable Channel
SimpleMessageChannelBuilder::createQueueChannel('notifications', true)
]
);
$ecotoneTestSupport
->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'))
->run('notifications', ExecutionPollingMetadata::createWithTestingSetup());
// 2. Releasing messages awaiting for 60 seconds
$ecotoneTestSupport->releaseAwaitingMessagesAndRunConsumer(
'orders',
1000 * 60,
ExecutionPollingMetadata::createWithTestingSetup()
);
$this->assertEquals(
1,
count($this->notifier->getNotificationsOf('placedOrder'))
);
- 1.The default behaviour for In Memory Channels is to ignore delays. By setting
second parameter
totrue
we are registering In Memory Channel that will be aware of delays. - 2.We are releasing messages that awaits for 60 seconds or less.
In some scenarios, you may just want to turn off given channel, because you're not interested in messages that goes through it.
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
// 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
[OrderService::class, NotificationService::class],
[new OrderService(), new NotificationService()],
enableAsynchronousProcessing: [
// 1. Create nullable channel
SimpleMessageChannelBuilder::createNullableChannel('notifications')
]
);
- 1.By registering
nullable channel
, we make use that all messages that will go to given channel will be dropped.