composer require ecotone/kafkaImplementation is based on rdkafka.
In order to use Kafka Support we need to add KafkaBrokerConfiguration to our Dependency Container.
This module provides integration with Kafka.
This module is available as part of Ecotone Enterprise.
[Article]
# config/services.yaml
# You need to have Kafka instance running on your localhost, or change DSN
Ecotone\Kafka\Configuration\KafkaBrokerConfiguration:
class: Ecotone\Kafka\Configuration\KafkaBrokerConfiguration
arguments:
$bootstrapServers:
- localhost:9094# Register Kafka Service in Provider
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
public function register()
{
$this->app->singleton(KafkaBrokerConfiguration::class, function () {
return new KafkaBrokerConfiguration(['localhost:9094']);
});
}use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;
$application = EcotoneLiteApplication::boostrap(
[
KafkaBrokerConfiguration::class => new KafkaBrokerConfiguration(['localhost:9094'])
]
);To create custom publisher or consumer provide Service Context.
Custom Publishers and Consumers are great for building integrations for existing infrastructure or setting up a customized way to communicate between applications. With this you can take over the control of what is published and how it's consumed.
Then Publisher will be available for us in Dependency Container under MessagePublisher reference. This will make it available in your dependency container under MessagePublisher name.
You can modify your Message Publisher with specific :
class MessagingConfiguration
{
#[ServiceContext]
public function distributedPublisher()
{
return KafkaPublisherConfiguration::createWithDefaults(
topicName: 'orders'
);
}
}Message Channels simplify to the maximum integration with Message Broker. From application perspective all we need to do, is to provide channel implementation. Ecotone will take care of whole publishing and consuming part.
By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:
We can also customize the group id, which by default following channel name:
Position of Message Consumer is tracked against given group id.. Depending on retention policy, changing group id for existing Message Channel may result in re-delivering messages.
To define Final Failure Strategy:
#[ServiceContext]
public function distributedPublisher()
{
return KafkaPublisherConfiguration::createWithDefaults(
topicName: 'orders'
)
->setConfiguration('request.required.acks', '1');
}class MessagingConfiguration
{
#[ServiceContext]
public function orderChannel()
{
return KafkaMessageChannelBuilder::create("orders");
}
}#[ServiceContext]
public function orderChannel()
{
return KafkaMessageChannelBuilder::create(
channelName: "orders",
topicName: "crm_orders"
);
}#[ServiceContext]
public function orderChannel()
{
return KafkaMessageChannelBuilder::create(
channelName: "orders",
groupId: "crm_application"
);
}#[ServiceContext]
public function orderChannel()
{
return KafkaMessageChannelBuilder::create(
channelName: "orders",
groupId: "crm_application"
)
->withFinalFailureStrategy(FinalFailureStrategy::RESEND);
}Ecotone provides Message Channel abstraction, which is used for simplifying asynchronous processing. It's good to understand the difference in Kafka Message Channel vs typicial Queue based Message Channel, as it provides extra abilities. To register Kafka Message Channel, we will use Service Context:
Now, when we will be sending Messages to Kafka Message Channel, we will actually be sending to Kafka Topic. Topics can be split in multiple partitions, and each partition divine our topic into separate processing unit.
When we will start our Message Consumer process:
our Message Consumer process will get partitions allocated:
The interesting part is, what will happen when we will start second Message Consumption process:
As a result of running another Message Consumer for given Topic, Kafka will kick-off rebalance process (repartitioning) to re-assign partitions to given Message Consumers. Each partition can only be assigned to single Message Consumer. Therefore in our example, Kafka could decide to assign partition one and two to the first Message Consumer, and partition three to the second Message Consumer.
Scaling Message Consumers over amount of partitions won't speed up processing, as we can only scale, up to partitions volume for given Topic. It's good practice to over-provision partitions for given topic at the beginning, to ensure future scalability needs.
Given partition is guaranteed to be handled by single Message Consumer at time. Therefore this enables order for the Messages, as we will be consuming Messages in order they have been published.
However for partitioning to be useful we need to ensure that related Messages will land in same partition, if Messages are placed randomly, benefits of partitioning are basically lost. To choose the partition Kafka use hashing mechanism based on provided Key. Hashed key will generate number to which partition provided Message should be assigned.
Providing same key for correlated resource will ensure, that Messages will land in same partition, therefore will be handled in order.
When we are sending Message that goes over Kafka Message Channel, we can provide the key to be used explicitly
Then this will be used for hashing and all Messages sent with key "123" will be placed in same partition and preserve the order.
By default if no partition key is provided, Ecotone will use Message Id as partition key. This means messages will be distributed randomly across partitions.
However If we would like to provide partition key manually for each Message, to ensure correct partition, it would quickly become unmaintainable. Fortunately Ecotone comes with support for Aggregate Command and Events, so we don't need to think about this much
Whenever we send an Message to Aggregate, Ecotone will use the identifier of the Aggregate as partition key. This way order will be preserved within Aggregate instance. For example having Aggregate Ticket with ticketId identifier:
and Command CloseTicket
Then if we would send an Command with id "123", it will be used as partition key:
Ecotone recognizes to which Aggregate instance we are sending the Message, and will use as partition key without any additional code needed on our side.
Events published from Aggregates are automatically assigned partition key. This way Events for same Aggregate instance will land in related partition automatically.
Partition key is automatically assigned to Events, no matter if we use Event Sourced or State-Stored Aggregates.
To quickly get up and running with consuming existing Topics, we can use Kafka Consumer feature. Simply by marking given method with Kafka Consumer attribute, we are getting access to asynchronous process that will now run and consume Message from defined Topic.
To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:
Then we run it as any other , using orderConsumer name.
#[ServiceContext]
public function orderChannel()
{
return KafkaMessageChannelBuilder::create(
channelName: "users"
);
}bin/console ecotone:run users -vvv


By default Consumer Group id will be same as endpoint id, however we can provide customized name if needed:
You can modify your Message Consumer with specific rdkafka configuration:
We can accesss specific Kafka Headers using standard Ecotone's metadata mechanism
kafka_topic - Topic name for incoming message
kafka_partition - Partition of incoming message
kafka_offset - Offset of Message Consumer
If we have our Custom Conversion or JMS Module installed, then we can leverage automatic conversion:
Read more about Conversion in related section.
In case of failure we may try to recover instantly. To do so we can provide InstantRetry attribute:
Read more about Instant Retry in related section.
To handle Message when failure happen, we may decide to send it to Error Channel. This can be used to for example store the Message for later review.
Read more about Error Channel in related section.
We can also define in case of unrecoverable failure, what should happen:
Read more about final failure strategy in related section.
We can define custom deduplication key to ensure no same Message will be handled twice.
Read more about deduplication in related section.
artisan ecotone:run users -vvv$messagingSystem->run("users");$commandBus->->send(
new ScheduleMeeting(),
metadata: [
KafkaHeader::KAFKA_TARGET_PARTITION_KEY_HEADER_NAME => '123'
]
);#[Aggregate]
class Ticket
{
#[Identifier]
private int $ticketId;
#[Asynchronous("async")]
#[CommandHandler]
public function close(CloseTicket $command): voidfinal readonly class CloseTicket
{
public function __construct(public string $ticketId)
{
}
}$commandBus->->send(new CloseTicket(123));#[Aggregate]
class Ticket
{
use withEvents;
#[Identifier]
private int $ticketId;
#[Asynchronous("async")]
#[CommandHandler]
public function close(CloseTicket $command): void
{
// do something
$this->recordThat(new TicketClosed($this->ticketId));
}#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(string $payload, array $metadata): void
{
// do something
}#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders'],
groupId: 'order_subscriber'
)]
public function handle(string $payload, array $metadata): void
{
// do something
}#[ServiceContext]
public function distributedPublisher()
{
return KafkaConsumerConfiguration::createWithDefaults(
endpointId: 'orderConsumers'
)
->setConfiguration('auto.offset.reset', 'earliest');
}#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(
string $payload,
#[Header("kafka_topic")] string $topicName,
): void
{
// do something
}#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// do something
}#[InstantRetry(retryTimes: 3)]
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// handle
}#[ErrorChannel('customErrorChannel')]
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// handle
}#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders'],
finalFailureStrategy: FinalFailureStrategy::RESEND
)]
public function handle(Order $payload): void
{
// handle
}#[Deduplicated('orderId')]
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// handle
}