Lesson 6: Scheduling And Asynchronous

Not having code for Lesson 6? git checkout lesson-6

Ecotone provides abstractions for asynchronous execution and scheduling.

Scheduling

Some times you will want to run endpoint on given time or with specific rate. In order to do it Ecotone comes with Scheduled endpoint.

Let's suppose that we integrate with Currency Exchange Service and we need to recalculate the price, whenever the currency ratio will change. We will periodically call the external service and change the price our product. Let's run our testing command, so we do register product with id of 1.

Let's run our testing command:

bin/console ecotone:quickstart
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Commit transaction
110
Good job, scenario ran with success!

And now let's leave only Query Handler call

public function run() : void
{
echo $this->queryBus->convertAndSend("product.getCost", MediaType::APPLICATION_JSON, \json_encode(["productId" => 1]));
}
bin/console ecotone:quickstart
110
Good job, scenario ran with success!

So our price now is 110. Now we will add our first Scheduled endpoint

namespace App\Infrastructure;
use Ecotone\Messaging\Annotation\Scheduled;
use Ecotone\Messaging\Annotation\MessageEndpoint;
use Ecotone\Messaging\Annotation\Poller;
/**
* @MessageEndpoint()
*/
class CurrencyExchanger
{
/**
* @Scheduled(
* endpointId="currency_exchanger",
* requestChannelName="product.changePrice",
* poller=@Poller(
* fixedRateInMilliseconds=1000
* )
* )
*/
public function exchange() : array
{
echo "Changing the price\n";
return ["productId" => 1, "cost" => 120];
}
}

We start by marking method as @Scheduled and class as @MessageEndpoint. It takes the data returned from exchange method and use it as payload's for the Message which is send to requestChannelName. endpointId - Describes endpoint identifier requestChannelName - describes where it should send created message poller - Describes, how this endpoint should be invoked. In this case, we informed Ecotonethat we want to trigger this method every second (1000 milliseconds is 1 second) using fixedRateInMilliseconds. You can also set up cron expression. In this case to run it every minute we would do:

@Poller(
cron="* * * * *"
)

Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list-all-asynchronous-endpoints

bin/console ecotone:list-all-asynchronous-endpoints
+--------------------+
| Endpoint Names |
+--------------------+
| currency_exchanger |
+--------------------+

Now we are ready to run out endpoint.

bin/console ecotone:run-endpoint currency_exchanger
Changing the price
Changing the price

Great, we have defined our first Inbound Channel Adapter. Which will work in the background and will handle changing the price for us. Let's run our testing command and see, if the cost was changed to 120.

bin/console ecotone:quickstart
120
Good job, scenario ran with success!

Asynchronous

Let's move aside our currency_exchanger and turn it off. We got new requirement: User should be able to place order for different products.

We will need to build Order aggregate.

Let's start by creating PlaceOrderCommand with ordered product Ids

namespace App\Domain\Order;
class PlaceOrderCommand
{
private int $orderId;
/**
* @var int[]
*/
private array $productIds;
/**
* @return int[]
*/
public function getProductIds(): array
{
return $this->productIds;
}
public function getOrderId() : int
{
return $this->orderId;
}
}

We will need OrderedProduct value object, which will describe, cost and identifier of ordered product

namespace App\Domain\Order;
class OrderedProduct
{
private int $productId;
private int $cost;
public function __construct(int $productId, int $cost)
{
$this->productId = $productId;
$this->cost = $cost;
}
public function getCost(): int
{
return $this->cost;
}
}

And our Order aggregate

namespace App\Domain\Order;
use App\Infrastructure\AddUserId\AddUserId;
use Ecotone\Messaging\Conversion\MediaType;
use Ecotone\Modelling\Annotation\Aggregate;
use Ecotone\Modelling\Annotation\AggregateIdentifier;
use Ecotone\Modelling\Annotation\CommandHandler;
use Ecotone\Modelling\Annotation\QueryHandler;
use Ecotone\Modelling\QueryBus;
/**
* @Aggregate()
* @AddUserId()
*/
class Order
{
/**
* @AggregateIdentifier()
*/
private int $orderId;
private int $buyerId;
/**
* @var OrderedProduct[]
*/
private array $orderedProducts;
private function __construct(int $orderId, int $buyerId, array $orderedProducts)
{
$this->orderId = $orderId;
$this->buyerId = $buyerId;
$this->orderedProducts = $orderedProducts;
}
/**
* @CommandHandler(inputChannelName="order.place")
*/
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->convertAndSend("product.getCost", MediaType::APPLICATION_X_PHP_ARRAY, ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}
/**
* @QueryHandler(inputChannelName="order.getTotalPrice")
*/
public function getTotalPrice() : int
{
$totalPrice = 0;
foreach ($this->orderedProducts as $orderedProduct) {
$totalPrice += $orderedProduct->getCost();
}
return $totalPrice;
}
}

placeOrder - Place order method make use of QueryBus to retrieve cost of each ordered product. You could find out, that we are not using application/json for product.getCost query, ecotone/jms-converter can handle array transformation, so we do not need to use json.

You could inject service into placeOrder that will hide QueryBus implementation from the domain, or you may get this data from data store directly. We do not want to complicate the solution now, so we will use QueryBus directly.

We do not need to change or add new Repository, as our exiting one can handle any new aggregate arriving in our system.

Let's change our testing class and run it!

class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
$this->commandBus->convertAndSend(
"product.register",
MediaType::APPLICATION_X_PHP_ARRAY,
["productId" => 1, "cost" => 100]
);
$this->commandBus->convertAndSend(
"product.register",
MediaType::APPLICATION_X_PHP_ARRAY,
["productId" => 2, "cost" => 300]
);
$orderId = 100;
$this->commandBus->convertAndSend(
"order.place",
MediaType::APPLICATION_X_PHP_ARRAY,
["orderId" => $orderId, "productIds" => [1,2]]
);
echo $this->queryBus->convertAndSend("order.getTotalPrice", MediaType::APPLICATION_X_PHP_ARRAY, ["orderId" => $orderId]);
}
}
bin/console ecotone:quickstart
Running example...
Start transaction
Product with id 1 was registered!
Commit transaction
Start transaction
Product with id 2 was registered!
Commit transaction
Start transaction
Commit transaction
400
Good job, scenario ran with success!

We want to be sure, that we do not lose any order, so we will register our order.place Command Handler to run asynchronously using RabbitMQ now. Let's start by adding extension to Ecotone, that can handle RabbitMQ:

composer require ecotone/amqp

We also need to add our ConnectionFactory to our Dependency Container.

Symfony - Local Environment
Symfony - Docker
Laravel - Local Environment
Laravel - Docker
Symfony - Local Environment
# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# You need to have RabbitMQ instance running on your localhost, or change DSN
Enqueue\AmqpLib\AmqpConnectionFactory:
class: Enqueue\AmqpLib\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@localhost:5672//"
Symfony - Docker
# Add AmqpConnectionFactory in config/services.yaml
services:
_defaults:
autowire: true
autoconfigure: true
App\:
resource: '../src/*'
exclude: '../src/{Kernel.php}'
Bootstrap\:
resource: '../bootstrap/*'
exclude: '../bootstrap/{Kernel.php}'
# docker-compose.yml has RabbitMQ instance defined. It will be working without
# addtional configuration
Enqueue\AmqpLib\AmqpConnectionFactory:
class: Enqueue\AmqpLib\AmqpConnectionFactory
arguments:
- "amqp+lib://guest:guest@rabbitmq:5672//"
Laravel - Local Environment
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpLib\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@localhost:5672//");
});
}
(...)
Laravel - Docker
# Add AmqpConnectionFactory in bootstrap/QuickStartProvider.php
namespace Bootstrap;
use Illuminate\Support\ServiceProvider;
use Enqueue\AmqpLib\AmqpConnectionFactory;
class QuickStartProvider extends ServiceProvider
{
public function register()
{
$this->app->singleton(AmqpConnectionFactory::class, function () {
return new AmqpConnectionFactory("amqp+lib://guest:guest@rabbitmq:5672//");
});
}
(...)

We register our AmqpConnectionFactory under the class name Enqueue\AmqpLib\AmqpConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.

Let's add our first AMQP Backed Channel (RabbitMQ Channel), in order to do it, we need to create our first Application Context. Application Context is a non-constructor class, responsible for extending Ecotone with extra configurations, that will help the framework act in a specific way. In here we want to tell Ecotone about AMQP Channel with specific name. Let's create new class App\Infrastructure\MessagingConfiguration.

namespace App\Infrastructure;
use Ecotone\Amqp\AmqpBackedMessageChannelBuilder;
use Ecotone\Messaging\Annotation\ApplicationContext;
use Ecotone\Messaging\Annotation\Extension;
/**
* @ApplicationContext()
*/
class MessagingConfiguration
{
/**
* @Extension()
*/
public function orderChannel()
{
return [
AmqpBackedMessageChannelBuilder::create("orders")
];
}
}

@ApplicationContext - Tells Ecotone that this is Application configuration class @Extension - Tell that this method returns configuration. It can return array of objects or a single object.

Now we need to tell our order.place Command Handler, that it should run asynchronously using our neworders channel.

use Ecotone\Messaging\Annotation\Asynchronous;
(...)
/**
* @Asynchronous(channelName="orders")
* @CommandHandler(endpointId="place_order_endpoint", inputChannelName="order.place")
*/
public static function placeOrder(PlaceOrderCommand $command, array $metadata, QueryBus $queryBus) : self
{
$orderedProducts = [];
foreach ($command->getProductIds() as $productId) {
$productCost = $queryBus->convertAndSend("product.getCost", MediaType::APPLICATION_X_PHP_ARRAY, ["productId" => $productId]);
$orderedProducts[] = new OrderedProduct($productId, $productCost->getAmount());
}
return new self($command->getOrderId(), $metadata["userId"], $orderedProducts);
}

We do it by adding @Asynchronous annotation with channelName used for asynchronous endpoint. Endpoints using @Asynchronous are required to have endpointId defined, the name can be anything as long as it's not the same as inputChannelName.

@CommandHandler(endpointId="place_order_endpoint" (...)

Let's run our command which will tell us what asynchronous endpoints we have defined in our system: ecotone:list-all-asynchronous-endpoints

bin/console ecotone:list-all-asynchronous-endpoints
+--------------------+
| Endpoint Names |
+--------------------+
| orders |
| currency_exchanger |
+--------------------+

We have new asynchronous endpoint available orders. Name comes from the message channel name. You may wonder why it is not place_order_endpoint, it's because via single asynchronous channel we can handle multiple endpoints, if needed. This is further explained in asynchronous section.

Let's change orderId in our testing command, so we can place new order.

public function run() : void
{
$this->commandBus->convertAndSend(
"product.register",
MediaType::APPLICATION_X_PHP_ARRAY,
["productId" => 1, "cost" => 100]
);
$this->commandBus->convertAndSend(
"product.register",
MediaType::APPLICATION_X_PHP_ARRAY,
["productId" => 2, "cost" => 300]
);
$orderId = 990;
$this->commandBus->convertAndSend(
"order.place",
MediaType::APPLICATION_X_PHP_ARRAY,
["orderId" => $orderId, "productIds" => [1,2]]
);
echo $this->queryBus->convertAndSend("order.getTotalPrice", MediaType::APPLICATION_X_PHP_ARRAY, ["orderId" => $orderId]);
}

After running our testing command bin/console ecotone:quickstartwe should get an exception:

AggregateNotFoundException:
Aggregate App\Domain\Order\Order:getTotalPrice was not found for indentifie
rs {"orderId":990}

That's fine, we have registered order.place Command Handler to run asynchronously, so we need to run our asynchronous endpoint in order to handle Command Message. If you did not received and exception, it's probably you did not change the orderId and we already registered such order. Let's run our asynchronous endpoint

bin/console ecotone:run-endpoint orders -vvv
[info] {"orderId":990,"productIds":[1,2]}

Like we can see, it ran our Command Handler and placed the order. We can change our testing command to run only Query Handlerand check, if the order really exists now.

class EcotoneQuickstart
{
private CommandBus $commandBus;
private QueryBus $queryBus;
public function __construct(CommandBus $commandBus, QueryBus $queryBus)
{
$this->commandBus = $commandBus;
$this->queryBus = $queryBus;
}
public function run() : void
{
$orderId = 990;
echo $this->queryBus->convertAndSend("order.getTotalPrice", MediaType::APPLICATION_X_PHP_ARRAY, ["orderId" => $orderId]);
}
}
bin/console ecotone:quickstart -vvv
Running example...
400
Good job, scenario ran with success!

There is one thing we can change. As we don't always have access to the context of executor, which calling the command after the Message goes asynchronous, we can change our AddUserIdService Interceptor. This Interceptor is registered as Before Interceptor which is before execution of our Command Handler, but what we want to achieve is, to call this interceptor before message will be send to the asynchronous channel. For this there is Presend Interceptor available. Change @Before annotation to @Presend annotation and we are done.

namespace App\Infrastructure\AddUserId;
use Ecotone\Messaging\Annotation\Interceptor\Presend;
use Ecotone\Messaging\Annotation\Interceptor\MethodInterceptor;
/**
* @MethodInterceptor()
*/
class AddUserIdService
{
/**
* @Presend(
* pointcut="@(App\Infrastructure\AddUserId\AddUserId)",
* changeHeaders=true,
* precedence=0
* )
*/
public function add() : array
{
return ["userId" => 1];
}
}

As long as headers are scalar types, Ecotone will handle serialization and deserialization. Keep them simple and you can be sure, that asynchronous endpoints will work, just as they would be synchronous.

If we would do the same with administrator requirement, exception will be thrown, before the Message will be put to the asynchronous channel. Thanks to Presend interceptor, we can validate messages, before they will be send for handling. Try it, if you want.

The final code is available as lesson-7: git checkout lesson-7

We made it through, Congratulations! We have successfully registered asynchronous Command Handler and safely placed the order. We have finished last lesson. You may now apply the knowledge in real project or check more advanced usages starting here Modelling Overview.