Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Example Asynchronous Handler
  • Running code Synchronously
  • Running Asynchronous Consumer
  • Default Message Channels
  • Polling Metadata
  • Testing Serialization
  • Testing Delayed Messages
  • Delaying to given date
  • Dropping all messages coming to given channel

Was this helpful?

Export as PDF
  1. Modelling
  2. Testing Support

Testing Asynchronous Messaging

Testing asynchronous communication in PHP

PreviousTesting Event Sourcing ApplicationsNextOverview

Last updated 3 months ago

Was this helpful?

When your code becomes asynchronous, sending Command is not enough to verify full flow. Your message will land in (queue) awaiting for consumption. This requires executing your consumer in order to test full flow.

Ecotone provides full support for testing your asynchronous messaging architecture.

Example Asynchronous Handler

As an example, let's imagine scenario, where after placing order we want to send notification asynchronously.

class NotificationService
{
    #[Asynchronous('notifications')]
    #[EventHandler(endpointId: 'notifyOrderWasPlaced')]
    public function notify(OrderWasPlaced $event, Notifier $notifier): void
    {
        $notifier->notifyAbout('placedOrder', $event->getOrderId());
    }
}

Running code Synchronously

By default all the asynchronous code will run synchronously. This simplifies the tested code and speed ups your test suite.

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    // 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()]
);

// this will publish OrderWasPlaced as a result
$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));

$this->assertEquals(
    1,
    count($this->notifier->getNotificationsOf('placedOrder'))
);

Running Asynchronous Consumer

Ecotone provides In Memory Pollable Channels which can replace real implementation for testing purposes.

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    // 1. we need to provide Message Channel to use
    enableAsynchronousProcessing: [
        SimpleMessageChannelBuilder::create('notifications')
    ]
);

// you could run Event Bus with OrderWasPlaced here instead
$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));

// 2. running consumer
$ecotoneTestSupport->run('notifications');

$this->assertEquals(
    1,
    // 3. we can provide some in memory implementation for testing purposes
    count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. Enable asynchronous processing - We enable asynchronous processing and provide Message Channel to poll from. Message Channel can be Real (SQS, RabbitMQ, Dbal etc) or In Memory one

  2. Run - This runs the the consumer with given PollingMetadata

  3. Assert - We assert the state after consumer has exited

In above example we are running consumer within same process as test. You may run consumer from separate process like this: (example for symfony): php bin/console ecotone:run notifications --handledMessageLimit=1 --executionTimeLimit=100 --stopOnFailure However running consumer as separate process is not advised, as it requires booting separate processwhich slows test suite, and due to lack ofshared memory does not allow for using In Memory implementations.

Default Message Channels

For testing with In Memory Channels, we can omit providing specific implementation. Ecotone will deliver an default In Memory Message Channels for us:

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    // We simply state, that we want to enable async processing
    enableAsynchronousProcessing: true
);

Polling Metadata

By default Ecotone will optimize for your test scenario:

  • If real Message Channel like RabbitMQ, SQS, Redis will be used in test, then Message Consumer will be running up to 100ms and will stop on error.

  • If In Memory Channel will be used, then Message Consumer will be running till it fetches all Messages or error will happen.

The above default configuration ensures, tests will be kept stable and will run finish quickly. However if in case of need this behaviour can be customized by providing ExecutionPollingMetadata.

$ecotoneTestSupport->run(
    'notifications',
    ExecutionPollingMetadata::createWithTestingSetup(
        // consumer will stop after handling single message
        amountOfMessagesToHandle: 1, 
        // or consumer will stop after 100 ms
        maxExecutionTimeInMilliseconds: 100,
        // or consumer will stop immediately after error
        failAtError: true
    )
);

Testing Serialization

To test serialization we may fetch Message directly from the Channel and verify it's payload.

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    enableAsynchronousProcessing: [
        // 1. Enable conversion on given channel
        SimpleMessageChannelBuilder::createQueueChannel(
            'notifications',
            conversionMediaType: 'application/json'
        )    
    ]
);

$ecotoneTestSupport->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));

$this->assertEquals(
    ['{"orderId":"123"}'],
    // 4. Verifing serialization - Get Event's payload from channel
    $ecotoneTestSupport->getMessageChannel('notifications')->receive()->getPayload()
);
  1. We can enable serialization on this channel for given Media Type. In this case, we say serialize to json all message going through notifications.

  2. We pull and verify messages sent to notifications channel, if their were sent in json format

By default In Memory Queue Channel will do the serialization to PHP native serialization or your default Serialization if defined. This way it works in similar way to your production Queue Channels. If you don't want to use serialization however, you may set type to conversionMediaType: MediaType::createApplicationXPHP()

If our serialization mechanism is JMS Module, we will need to enable it for testing:

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    configuration: ServiceConfiguration::createWithDefaults()
                ->withSkippedModulePackageNames(ModulePackageList::allPackagesExcept([
                        ModulePackageList::ASYNCHRONOUS_PACKAGE,
                        ModulePackageList::JMS_CONVERTER_PACKAGE
                ]))
    enableAsynchronousProcessing: [
        // 1. Enable conversion on given channel
        SimpleMessageChannelBuilder::createQueueChannel(
            'notifications',
            conversionMediaType: 'application/json'
        )    
    ]
);

Otherwise we will need to include Classes that customize our serialization.

Testing Delayed Messages

Our Handlers may be delayed in time and we may want to run peform few actions and then release the message, to verify production like flow.

class NotificationService
{
    #[Asynchronous('notifications')]
    #[Delayed(1000 * 60)] // 60 seconds
    #[EventHandler(endpointId: 'notifyOrderWasPlaced')]
    public function notify(OrderWasPlaced $event, Notifier $notifier): void
    {
        $notifier->notifyAbout('placedOrder', $event->getOrderId());
    }
}
$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    enableAsynchronousProcessing: [
       // 1. Turn on Delayable In Memory Pollable Channel
       SimpleMessageChannelBuilder::createQueueChannel('notifications', true)
    ]
);

$ecotoneTestSupport
    ->sendCommandWithRoutingKey('order.register', new PlaceOrder('123'));

// 2. Releasing messages awaiting for 60 seconds
$ecotoneTestSupport->run(
    'orders', 
    releaseAwaitingFor: TimeSpan::withSeconds(60)
);

$this->assertEquals(
    1,
    count($this->notifier->getNotificationsOf('placedOrder'))
);
  1. The default behaviour for In Memory Channels is to ignore delays. By setting second parameter to true we are registering In Memory Channel that will be aware of delays.

  2. We are releasing messages that awaits for 60 seconds or less.

Delaying to given date

If we delay to given point in time, then we can use date time for releasing this message.

$ecotoneTestSupport
    ->sendCommand(
        new SendNotification('123'),
        metadata: [
            "deliveryDelay" => new \DateTimeImmutable('+1 day')
        ]
    );

$ecotoneTestSupport->run(
    'notifications', 
    releaseAwaitingFor: new \DateTimeImmutable('+1 day')
);

$this->assertEquals(
    1,
    count($this->notifier->getNotifications())
);

Dropping all messages coming to given channel

In some scenarios, you may just want to turn off given channel, because you're not interested in messages that goes through it.

$ecotoneTestSupport = EcotoneLite::bootstrapFlowTesting(
    // 1. We have dropped ChannelConfiguration::class, to replace it with our In Memory
    [OrderService::class, NotificationService::class],
    [new OrderService(), new NotificationService()],
    enableAsynchronousProcessing: [
        // 1. Create nullable channel
        SimpleMessageChannelBuilder::createNullableChannel('notifications')
    ]
);
  1. By registering nullable channel, we make use that all messages that will go to given channel will be dropped.

pollable message channel