Links

DBAL Support

Transactions, Asynchronous, Dead Letter Queue PHP DBAL

Installation

composer require ecotone/dbal

Module Powered By

Powered by powerful database abstraction layer Doctrine/Dbal and Enqueue for asynchronous communication

Configuration

In order to use Dbal Support we need to add ConnectionFactory to our Dependency Container.

Using Existing Connection

Symfony
Laravel
# config/services.yaml
Enqueue\Dbal\DbalConnectionFactory:
factory: ['Ecotone\Dbal\DbalConnection', 'create']
arguments: ["@Doctrine\DBAL\Connection"]
# Register Service in Provider
use Enqueue\Dbal\DbalConnectionFactory;
use Ecotone\Dbal\DbalConnection;
public function register()
{
$this->app->singleton(DbalConnectionFactory::class, function ($app) {
return DbalConnection::create(DB::connection()->getDoctrineConnection());
});
}
We register our DbalConnectionFactory under the class name Enqueue\Dbal\DbalConnectionFactory. This will help Ecotone resolve it automatically, without any additional configuration.
Ecotone starts database transaction by default. Make use of existing connection to be sure your changes are commited.

Using Database Connection String

Symfony
Laravel
Lite
# config/services.yaml
Enqueue\Dbal\DbalConnectionFactory:
class: Enqueue\Dbal\DbalConnectionFactory
arguments: ["pgsql://user:[email protected]:5432/db_name"]
# Register Service in Provider
use Enqueue\Dbal\DbalConnectionFactory;
public function register()
{
$this->app->singleton(DbalConnectionFactory::class, function () {
return new DbalConnectionFactory('pgsql://user:[email protected]:5432/db_name');
});
}
use Enqueue\Dbal\DbalConnectionFactory;
$application = EcotoneLiteApplication::boostrap(
[
DbalConnectionFactory::class => new DbalConnectionFactory('pgsql://user:[email protected]:5432/db_name')
]
);

Using Manager Registry

If we want to make use of existing connection using Manager Registry, we can do it this way
Symfony
# config/services.yaml
Enqueue\Dbal\DbalConnectionFactory:
factory: ['Ecotone\Dbal\DbalConnection', 'createForManagerRegistry']
arguments: ["@doctrine","default"]
Register Manager Registry under DbalConnectionFactory, if you want to make use of auto configuration.
Otherwise you will need to tell Message Channel, Transactions the name of Connection Factory.

Doctrine ORM Support

If you have set up Manager Registry Connection, you may enable support for Doctrine ORM Repositories.
#[ServiceContext]
public function getDbalConfiguration(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDoctrineORMRepositories(true);
}

Demo

Read more about integration in following blog post.

Message Channel

To create Dbal Backed Message Channel, we need to create Service Context.
class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return DbalBackedMessageChannelBuilder::create("orders");
}
}
Now orders channel will be available in our Messaging System.

Message Channel Configuration

DbalBackedMessageChannelBuilder::create("orders")
->withAutoDeclare(false) // do not auto declare queue
->withDefaultTimeToLive(1000) // limit TTL of messages

Transactions

Ecotone Dbal comes with support for transactions. To enable transactions on specific endpoint, mark it with Ecotone\Dbal\DbalTransaction\DbalTransaction annotation.
#[CommandHandler]
#[DbalTransaction]
public function sellProduct(SellProduct $command) : void
{
// do something with $command
}
By default Ecotoneenables transactions for all Asynchronous Endpoints and Command Bus. You may use of Service Context to turn off this configuration. You may also add more connections to be handled.
class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions() : array
{
return [
DbalConfiguration::createWithDefaults()
->withTransactionOnCommandBus(true) // Turn for running command bus
->withTransactionOnAsynchronousEndpoints(true) // for all asynchronous endpoints
->withoutTransactionOnAsynchronousEndpoints(["notifications"]) // turn off for list of asynchronous endpoint
->withDefaultConnectionReferenceNames([
"Enqueue\Dbal\DbalConnectionFactory",
"AnotherDbalConnectionFactory"
])
];
}
}

Deduplication (Idempotent Consumer)

Deduplication works whenever message is consumed in asynchronous way.
The role of deduplication is to safely receive same message multiple times. If message was already handled, it will be skipped.
Deduplication is enabled by default, you may disable it using DbalConfiguration.
class DbalConfiguration
{
#[ServiceContext]
public function registerTransactions(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDeduplication(false);
}
}
You may also define mark given Message Handler as deduplicated in case you would not want to deduplicate by default or, if custom deduplication header is needed.
final class PaymentHandler
{
#[Deduplicated('paymentId')]
#[CommandHandler("receive_payment")]
public function receivePayment(ReceivePayment $command): void
{
// handle
}
}
This is especially useful when, we receive events from external services e.g. payment or email related events and they contains of custom identifier that you would like to deduplicate on. If you want to deduplicate asynchronous handlers, you should disable global deduplication, to avoid deduplicating twice.

Dead Letter

Enable Dead Letter

Dbal Module provides full support for Dead Letter.
All you need to do to enable is to point you errorChannel to dbal_dead_letter. Do it for:
In case you would like to use retries first, then use following configuration:
#[ServiceContext]
public function errorHandling()
{
return ErrorHandlerConfiguration::createWithDeadLetterChannel(
"errorChannel",
RetryTemplateBuilder::exponentialBackoff(1000, 10)
->maxRetryAttempts(3),
"dbal_dead_letter"
);
}

Dead Letter Console Commands

Help

Get more details about existing commands
Symfony
Laravel
bin/console ecotone:deadletter:help
artisan ecotone:deadletter:help

Listing Error Messages

Listing current error messages
Symfony
Laravel
Lite
bin/console ecotone:deadletter:list
artisan ecotone:deadletter:list
$list = $messagingSystem->runConsoleCommand("ecotone:deadletter:list", []);

Show Details About Error Message

Get more details about given error message
Symfony
Laravel
Lite
bin/console ecotone:deadletter:show {messageId}
artisan ecotone:deadletter:show {messageId}
$details = $messagingSystem->runConsoleCommand("ecotone:deadletter:show", ["messageId" => $messageId]);

Replay Error Message

Replay error message. It will return to previous channel for consumer to pick it up and handle again.
Symfony
Laravel
Lite
bin/console ecotone:deadletter:replay {messageId}
artisan ecotone:deadletter:replay {messageId}
$messagingSystem->runConsoleCommand("ecotone:deadletter:replay", ["messageId" => $messageId]);

Replay All Messages

Replaying all the error messages.
Symfony
Laravel
Lite
bin/console ecotone:deadletter:replayAll
artisan ecotone:deadletter:replayAll
$messagingSystem->runConsoleCommand("ecotone:deadletter:replayAll", []);

Delete Message

Delete given error message
Symfony
Laravel
Lite
bin/console ecotone:deadletter:delete {messageId}
artisan ecotone:deadletter:delete {messageId}
$messagingSystem->runConsoleCommand("ecotone:deadletter:delete", ["messageId" => $messageId]);

Turn off Dbal Dead Letter

#[ServiceContext]
public function dbalConfiguration()
{
return DbalConfiguration::createWithDefaults()
->withDeadLetter(false);
}

Document Store

DBAL provides support for Document Store, which is enabled by default. Every document is stored inside the "ecotone_document_store" table.

Standard Aggregate Repository

You may enable support for storing standard aggregates.
#[ServiceContext]
public function getDbalConfiguration(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDocumentStore(enableDocumentStoreAggregateRepository: true);
}

In Memory Document Store

For testing purposes you may want to enable In Memory implementation.
#[ServiceContext]
public function configuration()
{
return DbalConfiguration::createWithDefaults()
->withDocumentStore(inMemoryDocumentStore: true);
}

Table initialization

Table will be create for you, however this comes with extra SQL cost, to verify before adding new document, if table exists. After releasing you may want to disable the check, as you know, that the table already exists.
#[ServiceContext]
public function getDbalConfiguration(): DbalConfiguration
{
return DbalConfiguration::createWithDefaults()
->withDocumentStore(initializeDatabaseTable: false);
}