Links

Testing Aggregates and Sagas with Message Flows

Testing Aggregate, Saga in PHP
On top of Messaging Test Support, Ecotone provides abstraction to handle Message Flows. This is a clean way to define step by step what happens in your system and verify the results.
Testing with Message Flows, make your code closer to production like execution. Message Flows are great candidate for writing acceptance tests or testing flow of your Aggregates and Sagas.

Setting up Flow Test

To enable Flow Test Support, as first parameter we pass list of classes we want to resolve for our testing scenario, this can be aggregate, saga or any class containing Ecotone's attributes.
$testSupport = EcotoneLite::bootstrapFlowTesting([User::class]);
For more details on how to set up EcotoneLite for the different test scenario, read introduction chapter Testing Messaging.

Testing Aggregate

Our Aggregate looks like this:
#[Aggregate]
final class User
{
use WithAggregateEvents;
public function __construct(
#[AggregateIdentifier] private UuidInterface $userId,
private string $name,
private Email $email,
private PhoneNumber $phoneNumber,
private $isBlocked = false,
private $isVerified = false
) {
$this->recordThat(new UserWasRegistered($this->userId, $this->email, $this->phoneNumber));
}
#[CommandHandler]
public static function register(RegisterUser $command): self
{
return new self(
$command->getUserId(),
$command->getName(),
$command->getEmail(),
$command->getPhoneNumber()
);
}
#[CommandHandler("user.block")]
public function block(): void
{
$this->isBlocked = true;
}
#[CommandHandler("user.verify")]
public function verify(VerifyUser $command): void
{
$this->isVerified = true;
}
public function isBlocked(): bool
{
return $this->isBlocked;
}
}
When we've our Ecotone Lite set up, we may start using flow testing:
public function test_registering_user()
{
$userId = Uuid::uuid4();
$email = Email::create("[email protected]");
$phoneNumber = PhoneNumber::create("148518518518");
// 1. Comparing published events after registration
$this->assertEquals(
[new UserWasRegistered($userId, $email, $phoneNumber)],
// 2. Running Message Flow Test support
$testSupport
->sendCommand(new RegisterUser($userId, "johny", $email, $phoneNumber))
->getRecordedEvents()
);
}
  1. 1.
    We set up our expected event as outcome of running RegisterUser Command.
  2. 2.
    Then we can make use of Flow Support to send an Command and get event that was recorded as outcome.
You may send multiple command and chain them together, to build flow that you would like to test.

Verifying Aggregate's State

In cases when you're not using Event Sourced Aggregate, you may want to test state of the aggregate. We work only with high level API which are commands and we don't have direct access to Aggregate, Ecotone however exposes a possibility to fetch aggregate inside the flow.
$this->assertTrue(
$testSupport
->sendCommand(new RegisterUser($userId, "johny", $email, $phoneNumber))
// 1. Command with routing key
->sendCommandWithRoutingKey("user.block", metadata: ["aggregate.id" => $userId])
// 2. Fetching aggregate
->getAggregate(User::class, $userId)
// 3. Calling aggregate method
->isBlocked()
);
  1. 1.
    This Command Handler was registered by routing key without Command. With Flow support we may call Buses using routing key, and to tell which aggregate we want to call, we may use aggregate.id metadata.
  2. 2.
    Then we fetch given aggregate by id
  3. 3.
    And after that we can call method on aggregate that we've fetched
If you mark your query method with QueryHandler attribute it becomes part of the flow, and we don't need to add ->addAggregateUnderTest(User::class) and we may write following test instead:
#[QueryHandler("user.isBlocked")]
public function isBlocked(): bool
{
return $this->isBlocked;
}
$this->assertTrue(
$testSupport
->sendCommand(new RegisterUser($userId, "johny", $email, $phoneNumber))
->sendCommandWithRoutingKey("user.block", metadata: ["aggregate.id" => $userId])
->sendQueryWithRouting('user.isBlocked', metadata: ["aggregate.id" => $userId])
);

Testing Sagas

Testing sagas looks the same as testing Aggregates, however in most the cases we will want to publish events and then verify what commands were sent.
In this scenario we will also test delayed messages.
When given Handler is asynchronous, you can delay his execution. This way you can easily build flows that depend on time.
#[Saga]
final class VerificationProcess
{
use WithAggregateEvents;
private function __construct(
#[AggregateIdentifier]
private UuidInterface $userId,
private bool $isEmailVerified = false,
private bool $isPhoneNumberVerified = false
)
{
$this->recordThat(new VerificationProcessStarted($this->userId));
}
// 1. Saga starts after user was registered
#[EventHandler]
public static function start(UserWasRegistered $event): self
{
return new self($event->getUserId());
}
// 2. Whenever email or phone number are verified we mark it in the process
#[CommandHandler]
public function whenEmailWasVerified(EmailedVerified $event): void
{
$this->isEmailVerified = true;
}
// 2. Whenever email or phone number are verified we mark it in the process
#[CommandHandler]
public function whenPhoneNumberWasVerified(PhoneNumberVerified $event): void
{
$this->isPhoneNumberVerified = true;
}
// 3. We are setting this process to time out after 24 hours
// and in case user was not fully verified to block him.
#[Asynchronous("asynchronous_messages")]
#[Delayed(1000 * 60 * 60)] // execute 1 hour after registration
#[EventHandler(endpointId: "verificationProcess.timeout")]
public function timeout(VerificationProcessStarted $event, CommandBus $commandBus): void
{
if ($this->emailVerification->isVerified() && $this->phoneNumberVerification->isVerified()) {
$commandBus->send(new VerifyUser($this->userId));
}
$commandBus->sendWithRouting("user.block", metadata: ["aggregate.id" => $this->userId]);
}
}
1. and 2. This is standard Saga usage, something happen we start the process and collect information.
3. We are delaying execution of this event handler for 24 hours
In order to test this flow, we need to first add extension objects:
EcotoneLite::bootstrapFlowTesting(
[User::class],
configuration: ServiceConfiguration::createWithDefaults()
// 1. Setting up extension objects
->withExtensionObjects([
// 2. Register In Memory Queue Channel with delay enabled
SimpleMessageChannelBuilder::createQueueChannel("asynchronous_messages", true),
// 3. Testing Polling Metadata for running consumer
PollingMetadata::create("asynchronous_messages")->withTestingSetup(),
// 4. When command handler is not found, we will not throw exception
TestConfiguration::createWithDefaults()->withFailOnCommandHandlerNotFound(false)
(...)
])
// 5. We want to enable asynchronous module and disable the rest of modules
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
ModulePackageList::ASYNCHRONOUS_PACKAGE
]))
)
  1. 1.
    Extension Objects are objects that you will use with Service Context they provide a way to customize behaviour of your loaded modules.
  2. 2.
    SimpleMessageChannelBuilder allows us to register In Memory Channels. This maps to our Asynchronous attribute under EventHandler.
  3. 3.
    Polling Metadata allows us to configure customer in a way, that it will for given amount of time or will stop after handling first message. ->withTestingSetup ensures consumer is setup to quickly exit.
  4. 4.
    By default, if Command Handler is not found during the flow, exception will be thrown. This allows us to turn this option off, so we can assert, if Saga have sent given Command.
  5. 5.
    By default in Message Flows all modules are disabled by default. Asynchronous Package is responsible for asynchronous communication when it's disabled all communication becomes synchronous. However in this scenario, we want to test asynchronous delay behaviour, so we need to enable this module.
Let's start with testing happy flow, when all verifications were completed, so user is verified:
$this->assertEquals(
[new VerifyUser($this->userId)],
$ecotoneTestSupport
->publishEvent(new UserWasRegistered($userId, $email, $phoneNumber))
->publishEvent(new EmailedVerified($userId))
->publishEvent(new PhoneNumberVerified($userId))
// release messages awaiting for 1 hour or less
->releaseAwaitingMessagesAndRunConsumer("asynchronous_messages", 1000 * 60 * 60)
->getRecordedCommands()
);
In case not all verifications were completed and user will be blocked:
$this->assertEquals(
// 1. Command routing
[['user.block', $userId->toString()]],
$ecotoneTestSupport
->publishEvent(new UserWasRegistered($userId, $email, $phoneNumber))
->publishEvent(new EmailedVerified($userId))
// release messages awaiting for 1 hour or less
->releaseAwaitingMessagesAndRunConsumer("asynchronous_messages", 1000 * 60 * 60)
// 1. Recorded Command Routing
->getRecordedCommandsWithRouting()
);
  1. 1.
    If you take a look on above Saga, it does not use Command, but simply target the aggregate by aggregate.id. So in this case we may use of getRecordedCommandsWithRouting to get more details about targeted aggregate and routing key.
For simplicity of the code base, Ecotone leaves implementation open to create or not Command/Query classes. In cases like blocking user, you may simply pass aggregate.id in metadata. However, if you still want to create Command class for it, you can do so. You may compare both solution by looking at verify and block user methods.

Testing Communication between Aggregates and Sagas.

With Ecotone Lite you may create full acceptance tests, just add classes that should be used in your test scenarios and let Ecotone will glue everything together.
public function test_success_verification_after_registration()
{
$testSupport = EcotoneLite::bootstrapFlowTesting(
// 1. We are resolving User aggregate and Verification Process Saga
[User::class, VerificationProcess::class],
configuration: ServiceConfiguration::createWithDefaults()
->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([ModulePackageList::ASYNCHRONOUS_PACKAGE]))
->withExtensionObjects([
SimpleMessageChannelBuilder::createQueueChannel("asynchronous_messages", true),
PollingMetadata::create("asynchronous_messages")->withTestingSetup()
]),
);
$userId = Uuid::uuid4();
$this->assertEquals(
[['user.block', $userId->toString()]],
$testSupport
->sendCommand(new RegisterUser($userId, "John Snow", Email::create('[email protected]'), PhoneNumber::create('148518518518')))
->discardRecordedMessages()
->releaseAwaitingMessagesAndRunConsumer("asynchronous_messages", 1000 * 60 * 60)
->getRecordedCommandsWithRouting()
);
}
For given scenario you may add set of classes that should be used in this test. You may include Interceptors, Converters, too. In general all your Ecotone's production code will work in your test scenarios. This ensures tests will be as close to production as it's possible.