Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page

Was this helpful?

Export as PDF
  1. Tutorial

Lesson 6: Asynchronous Handling

Asynchronous PHP Workers

Not having code for Lesson 6? git checkout lesson-6

Ecotone provides abstractions for asynchronous execution.

Asynchronous

We got new requirement: User should be able to place order for different products.

We will need to build Order aggregate.

Let's start by creating PlaceOrderCommand with ordered product Ids

namespace App\Domain\Order;

class PlaceOrderCommand
{
    private int $orderId;
    /**
     * @var int[]
     */
    private array $productIds;

    /**
     * @return int[]
     */
    public function getProductIds(): array
    {
        return $this->productIds;
    }

    public function getOrderId() : int
    {
        return $this->orderId;
    }
}

We will need OrderedProduct value object, which will describe, cost and identifier of ordered product

namespace App\Domain\Order;

class OrderedProduct
{
    private int $productId;

    private int $cost;

    public function __construct(int $productId, int $cost)
    {
        $this->productId = $productId;
        $this->cost = $cost;
    }

    public function getCost(): int
    {
        return $this->cost;
    }
}

And our Order aggregate

namespace App\Domain\Order;

use App\Infrastructure\AddUserId\AddUserId;
use Ecotone\Messaging\Attribute\Asynchronous;
use Ecotone\Modelling\Attribute\Aggregate;
use Ecotone\Modelling\Attribute\AggregateIdentifier;
use Ecotone\Modelling\Attribute\CommandHandler;
use Ecotone\Modelling\Attribute\QueryHandler;
use Ecotone\Modelling\QueryBus;

#[Aggregate]
#[AddUserId]
class Order
{
    #[AggregateIdentifier]
    private int $orderId;

    private int $buyerId;

    /**
     * @var OrderedProduct[]
     */
    private array $orderedProducts;

    private function __construct(int $orderId, int $buyerId, array $orderedProducts)
    {
        $this->orderId = $orderId;
        $this->buyerId = $buyerId;
        $this->orderedProducts = $orderedProducts;
    }
    
    #[CommandHandler("order.place")]
    public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
    {
        $orderedProducts = [];
        foreach ($command->getProductIds() as $productId) {
            $productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
            $orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
        }

        return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
    }

    #[QueryHandler("order.getTotalPrice")]
    public function getTotalPrice() : int
    {
        $totalPrice = 0;
        foreach ($this->orderedProducts as $orderedProduct) {
            $totalPrice += $orderedProduct->getCost();
        }

        return $totalPrice;
    }
}

placeOrder - Place order method make use of QueryBus to retrieve cost of each ordered product. You could find out, that we are not using application/json for product.getCost query, ecotone/jms-converter can handle array transformation, so we do not need to use json.

You could inject service into placeOrder that will hide QueryBus implementation from the domain, or you may get this data from data store directly. We do not want to complicate the solution now, so we will use QueryBus directly.

We do not need to change or add new Repository, as our exiting one can handle any new aggregate arriving in our system.

Let's change our testing class and run it!

class EcotoneQuickstart
{
    private CommandBus $commandBus;
    private QueryBus $queryBus;

    public function __construct(CommandBus $commandBus, QueryBus $queryBus)
    {
        $this->commandBus = $commandBus;
        $this->queryBus = $queryBus;
    }

    public function run() : void
    {
        $this->commandBus->sendWithRouting(
            "product.register",
            ["productId" => 1, "cost" => 100]
        );
        $this->commandBus->sendWithRouting(
            "product.register",
            ["productId" => 2, "cost" => 300]
        );

        $orderId = 100;
        $this->commandBus->sendWithRouting(
            "order.place",
            ["orderId" => $orderId, "productIds" => [1,2]]
        );

        echo $this->queryBus->convertAndSend("order.getTotalPrice", MediaType::APPLICATION_X_PHP_ARRAY, ["orderId" => $orderId]);
    }
}
bin/console ecotone:quickstart
Running example...
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Product with id 2 was registered!
Commit transaction
Start transaction
Commit transaction
400
Good job, scenario ran with success!

We want to be sure, that we do not lose any order, so we will register our order.place Command Handler to run asynchronously using RabbitMQ now. Let's start by adding extension to Ecotone, that can handle RabbitMQ:

composer require ecotone/amqp

We also need to add our ConnectionFactory to our Dependency Container.

# Add AmqpConnectionFactory in config/services.yaml

services:
    _defaults:
        autowire: true
        autoconfigure: true
    App\:
        resource: '../src/*'
        exclude: '../src/{Kernel.php}'
    Bootstrap\:
        resource: '../bootstrap/*'
        exclude: '../bootstrap/{Kernel.php}'

# You need to have RabbitMQ instance running on your localhost, or change DSN
    Enqueue\AmqpExt\AmqpConnectionFactory:
        class: Enqueue\AmqpExt\AmqpConnectionFactory
        arguments:
            - "amqp+lib://guest:guest@localhost:5672//"
# Add AmqpConnectionFactory in config/services.yaml

services:
    _defaults:
        autowire: true
        autoconfigure: true
    App\:
        resource: '../src/*'
        exclude: '../src/{Kernel.php}'
    Bootstrap\:
        resource: '../bootstrap/*'
        exclude: '../bootstrap/{Kernel.php}'

# docker-compose.yml has RabbitMQ instance defined. It will be working without
# addtional configuration
    Enqueue\AmqpExt\AmqpConnectionFactory:
        class: Enqueue\AmqpExt\AmqpConnectionFactory
        arguments:
            - "amqp+lib://guest:guest@rabbitmq:5672//"
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php

namespace Bootstrap;

use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;

class QuickStartProvider extends ServiceProvider
{
    public function register()
    {
        $this->app->singleton(AmqpConnectionFactory::class, function () {
            return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");
        });
    }
(...)
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php

namespace Bootstrap;

use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpExt\AmqpConnectionFactory;

class QuickStartProvider extends ServiceProvider
{
    public function register()
    {
        $this->app->singleton(AmqpConnectionFactory::class, function () {
            return new AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//");
        });
    }
(...)
# Add AmqpConnectionFactory in bin/console.php

// add additional service in container
public function __construct()
{
   $this->container = new Container();
   $this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//"));
}

# Add AmqpConnectionFactory in bin/console.php 

// add additional service in container
public function __construct()
{
   $this->container = new Container();
   $this->container->set(Enqueue\AmqpExt\AmqpConnectionFactory::class, new Enqueue\AmqpExt\AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//"));
}

We register our AmqpConnectionFactory under the class name Enqueue\AmqpLib\AmqpConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.

Let's add our first AMQP Backed Channel (RabbitMQ Channel), in order to do it, we need to create our first Application Context. Application Context is a non-constructor class, responsible for extending Ecotone with extra configurations, that will help the framework act in a specific way. In here we want to tell Ecotone about AMQP Channel with specific name. Let's create new class App\Infrastructure\MessagingConfiguration.

namespace App\Infrastructure;

class MessagingConfiguration
{
    #[ServiceContext]
    public function orderChannel()
    {
        return [
            AmqpBackedMessageChannelBuilder::create("orders")
        ];
    }
}

ServiceContext - Tell that this method returns configuration. It can return array of objects or a single object.

Now we need to tell our order.place Command Handler, that it should run asynchronously using our neworders channel.

use Ecotone\Messaging\Annotation\Asynchronous;

(...)

#[Asynchronous("orders")]
#[CommandHandler("order.place", endpointId: "place_order_endpoint")]
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
    $orderedProducts = [];
    foreach ($command->getProductIds() as $productId) {
        $productCost = $queryBus->sendWithRouting("product.getCost", ["productId" => $productId]);
        $orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
    }

    return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}

We do it by adding Asynchronous annotation with channelName used for asynchronous endpoint. Endpoints using Asynchronous are required to have endpointId defined, the name can be anything as long as it's not the same as routing key (order.place).

#[CommandHandler("order.place", endpointId: "place_order_endpoint")]

You may mark Event Handler as asynchronous the same way.

Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list

bin/console ecotone:list
+--------------------+
| Endpoint Names     |
+--------------------+
| orders             |
+--------------------+

Let's change orderId in our testing command, so we can place new order.

public function run() : void
{
    $this->commandBus->sendWithRouting(
        "product.register",
        ["productId" => 1, "cost" => 100]
    );
    $this->commandBus->sendWithRouting(
        "product.register",
        ["productId" => 2, "cost" => 300]
    );

    $orderId = 990;
    $this->commandBus->sendWithRouting(
        "order.place",
        ["orderId" => $orderId, "productIds" => [1,2]]
    );

    echo $this->queryBus->sendWithRouting("order.getTotalPrice", ["orderId" => $orderId]);
}

After running our testing command bin/console ecotone:quickstartwe should get an exception:

AggregateNotFoundException:
                                                                               
  Aggregate App\Domain\Order\Order:getTotalPrice was not found for indentifie  
  rs {"orderId":990}  

That's fine, we have registered order.place Command Handler to run asynchronously, so we need to run our asynchronous endpoint in order to handle Command Message. If you did not received and exception, it's probably because orderId was not changed and we already registered such order. Let's run our asynchronous endpoint

bin/console ecotone:run orders --handledMessageLimit=1 --stopOnFailure -vvv
[info] {"orderId":990,"productIds":[1,2]}

Like we can see, it ran our Command Handler and placed the order. We can change our testing command to run only Query Handlerand check, if the order really exists now.

class EcotoneQuickstart
{
    private CommandBus $commandBus;
    private QueryBus $queryBus;

    public function __construct(CommandBus $commandBus, QueryBus $queryBus)
    {
        $this->commandBus = $commandBus;
        $this->queryBus = $queryBus;
    }

    public function run() : void
    {
        $orderId = 990;

        echo $this->queryBus->sendWithRouting("order.getTotalPrice", ["orderId" => $orderId]);
    }
}
bin/console ecotone:quickstart -vvv
Running example...
400
Good job, scenario ran with success!

There is one thing we can change. As in asynchronous scenario we may not have access to the context of executor to enrich the message,, we can change our AddUserIdService Interceptor to perform the action before sending it to asynchronous channel. This Interceptor is registered as Before Interceptor which is before execution of our Command Handler, but what we want to achieve is, to call this interceptor before message will be send to the asynchronous channel. For this there is Presend Interceptor available. Change Before annotation to Presend annotation and we are done.

namespace App\Infrastructure\AddUserId;

class AddUserIdService
{
   #[Presend(0, AddUserId::class, true)]
    public function add() : array
    {
        return ["userId" => 1];
    }
}

Ecotone will do it best to handle serialization and deserialization of your headers.

Now if non-administrator will try to execute this, exception will be thrown, before the Message will be put to the asynchronous channel. Thanks to Presend interceptor, we can validate messages, before they will go asynchronous, to prevent sending incorrect messages.

The final code is available as lesson-7: git checkout lesson-7

PreviousLesson 5: InterceptorsNextEnterprise

Last updated 1 year ago

Was this helpful?

We have new asynchronous endpoint available orders. Name comes from the message channel name. You may wonder why it is not place_order_endpoint, it's because via single asynchronous channel we can handle multiple endpoints, if needed. This is further explained in .

We made it through, Congratulations! We have successfully registered asynchronous Command Handler and safely placed the order. We have finished last lesson. You may now apply the knowledge in real project or check more advanced usages starting here .

asynchronous section
Modelling Overview