Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Running Asynchronously
  • Message Channel Name
  • Available Asynchronous Message Channels
  • Running Asychronous Endpoint (PollingMetadata)
  • Dynamic Configuration
  • Static Configuration
  • Multiple Asynchronous Endpoints
  • Intercepting asynchronous endpoint
  • Endpoint Id

Was this helpful?

Export as PDF
  1. Modelling
  2. Asynchronous Handling and Scheduling

Asynchronous Message Handlers

Running Asynchronously

Ecotone does allow for easy change from synchronous to asynchronous execution of given Message Handler.

In order to run Command Handler asynchronously we need to mark it as Asynchronous.

#[Asynchronous("orders")]
#[CommandHandler("order.place", "place_order_endpoint")
public function placeOrder(PlaceOrderCommand $command) : void
{
   // do something with $command
}

The same way we define for Event Handlers:

#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed")
public function when(OrderWasPlaced $event) : void
{
   // do something with $event
}

We need to add endpointId on our endpoint's annotation, this will be used to route the Message in isolation to our Message Handlers.

Message Channel Name

#[Asynchronous("orders")]

The "orders" string is actually a name of our Message Channel. This way we reference to specific implementation which we would like to use. To provide specific implementation like for example Database Channel, we would use ServiceContext.

final readonly class EcotoneConfiguration
{
    #[ServiceContext]
    public function databaseChannel()
    {
        return DbalBackedMessageChannelBuilder::create('orders');
    }
}

This is basically all we need to configure. Now database channel called orders will be used, whenever we will use Attribute with this name.

There are multiple different implementation which we can use:

Available Asynchronous Message Channels

At this moment following modules with pollable channels are available:

bin/console ecotone:list
+--------------------+
| Endpoint Names     |
+--------------------+
| orders             |
+--------------------+
artisan ecotone:list
+--------------------+
| Endpoint Names     |
+--------------------+
| orders             |
+--------------------+
$consumers = $messagingSystem->list()

After setting up Pollable Channel we can run the endpoint:

bin/console ecotone:run orders -vvv
artisan ecotone:run orders -vvv
$messagingSystem->run("orders");

Running Asychronous Endpoint (PollingMetadata)

Dynamic Configuration

You may set up running configuration for given consumer while running it.

  • handledMessageLimit - Amount of messages to be handled before stopping consumer

  • executionTimeLimit - How long consumer should run before stopping (milliseconds)

  • finishWhenNoMessages - Consumers will be running as long as there will be messages to consume

  • memoryLimit - How much memory can be consumed by before stopping consumer (Megabytes)

  • stopOnFailure - Stop consumer in case of exception

bin/console ecotone:run orders 
    --handledMessageLimit=5 
    --executionTimeLimit=1000 
    --finishWhenNoMessages
    --memoryLimit=512
    --stopOnFailure
artisan ecotone:run orders
    --handledMessageLimit=5 
    --executionTimeLimit=1000
    --finishWhenNoMessages 
    --memoryLimit=512
    --stopOnFailure
$messagingSystem->run(
    "orders", 
    ExecutionPollingMetadata::createWithDefault()
        ->withHandledMessageLimit(5)
        ->withMemoryLimitInMegabytes(100)
        ->withExecutionTimeLimitInMilliseconds(1000)
        ->withFinishWhenNoMessages(true)
        ->withStopOnError(true)
);

Static Configuration

class Configuration
{    
    #[ServiceContext]
    public function configuration() : array
    {
        return [
            PollingMetadata::create("orders")
                 ->setErrorChannelName("errorChannel")
                 ->setInitialDelayInMilliseconds(100)
                 ->setMemoryLimitInMegaBytes(100)
                 ->setHandledMessageLimit(10)
                 ->setExecutionTimeLimitInMilliseconds(100)
                 ->withFinishWhenNoMessages(true)
        ];
    }
}

Dynamic configuration overrides static

Multiple Asynchronous Endpoints

Using single asynchronous channel we may register multiple endpoints. This allow for registering single asynchronous channel for whole Aggregate or group of related Command/Event Handlers.

#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(SuccessEvent $event) : void
{
}

#[Asynchronous("orders")]
#[EventHandler]
public function onSuccess(FailureEvent $event) : void
{
}

Asynchronous Class

You may put Asynchronous on the class, level so all the endpoints within a class will becomes asynchronous.

Intercepting asynchronous endpoint

Endpoint Id

Each Asynchronous Message Handler requires us to define "endpointId". It's unique identifier of your Message Handler.

#[Asynchronous("orders")]
#[EventHandler(endpointId: "order_was_placed") // Your important endpoint Id
public function when(OrderWasPlaced $event) : void {}

Endpoint Id goes in form of Headers to your Message Queue. After Message is consumed from the Queue, Message will be directed to your Message Handler having given endpoint Id. This decouples the Message completely from the Message Handler Class and Method and Command/Event Class.

EndpointId ensures we can freely refactor our code and it will be backward compatible with Messages in the Queues. This means we can move the method and class to different namespaces, change the Command class names and as long as endpointId is kept the same Message will be delivered correctly.

PreviousAsynchronous Handling and SchedulingNextAsynchronous Message Bus (Gateways)

Last updated 4 months ago

Was this helpful?

If you're choose Ecotone will use to atomically store your changes and published messages.

Currently available Message Channels are integrated via great library .

Using configuration for statically configuration.

All asynchronous endpoints are marked with special attributeEcotone\Messaging\Attribute\AsynchronousRunningEndpoint If you want to all polling endpoints you should make use of on this.

Kafka Support
SQS Support
Redis Support
Symfony Messenger Transport Support
Laravel Queues
enqueue
Service Context
Outbox Pattern
intercept
DBAL Support
Dbal Message Channel
AMQP Support (RabbitMQ)
annotation related point cut