Concurrency Handling

Handling concurrency with optimistic and pessimistic locking

Two workers pick up commands for the same aggregate at the same time. Both load version 5, both apply a change, and the second save overwrites the first — silently. Concurrency exceptions are raised when two or more operations conflict trying to modify the same data, turning that race into something you can detect and retry. Ecotone provides built-in support for concurrency handling.

In order to solve concurrent access, Ecotone implements Optimistic Locking.

Optimistic Locking

Each Event Sourcing Aggregate or Event Sourcing Saga has a version property that represents the current version of the resource. When modifications are made, the version is incremented. If two concurrent processes attempt to modify the same resource with different versions, a concurrency exception is raised. This is default behaviour, if we are using inbuilt Event Sourcing support.

Custom Repositories

In case of Custom Repositories, we may use Ecotone support for optimistic locking to raise the exception in the Repository.

public function save(
    array $identifiers, object $aggregate, array $metadata, 
    // Version to verify before storing
    ?int $versionBeforeHandling
): void

Version will be passed to the repository, based on #[AggregateVersion] property inside the Aggregate/Saga.

We don't need to deal with increasing those on each action. Ecotone will increase it in our Saga/Aggregate automatically. We may also use inbuilt trait to avoid adding property manually.

#[Saga]
class OrderProcess
{
    use WithAggregateVersioning;
     
    (...)

Self Healing

To handle concurrency exceptions and ensure the system can self-heal, Ecotone offers retry mechanisms.

In synchronous scenarios, like Command Handler being called via HTTP, instant retries can be used to recover. If a concurrency exception occurs, the Command Message will be retried immediately, minimizing any impact on the end user. This immediate retry ensures that the Message Handler can self-heal and continue processing without affecting the user experience. &#xNAN;In asynchronous scenarios, you can use still use instant retries, yet you may also provide delayed retries. This means that when concurrency exception will occur, the Message will be retried after a certain delay. This as a result free the system resources from continues retries and allows for recovering after given period of delay.

Last updated

Was this helpful?