All pages
Powered by GitBook
1 of 6

Loading...

Loading...

Loading...

Loading...

Loading...

Loading...

Configuration

Installation

composer require ecotone/kafka

Implementation is based on rdkafka.

Configuration

In order to use Kafka Support we need to add KafkaBrokerConfiguration to our Dependency Container.

We register our KafkaBrokerConfiguration under the class name Ecotone\Kafka\Configuration\KafkaBrokerConfiguration. This will help Ecotone resolve it automatically, without any additional configuration.

Kafka Support

This module provides integration with Kafka.

This module is available as part of Ecotone Enterprise.

Materials

Links

  • [Article]

Ecotone Enterprise and Kafka support
# config/services.yaml
# You need to have Kafka instance running on your localhost, or change DSN
    Ecotone\Kafka\Configuration\KafkaBrokerConfiguration:
        class: Ecotone\Kafka\Configuration\KafkaBrokerConfiguration
        arguments:
            $bootstrapServers:
                - localhost:9094
# Register Kafka Service in Provider

use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;

public function register()
{
     $this->app->singleton(KafkaBrokerConfiguration::class, function () {
         return new KafkaBrokerConfiguration(['localhost:9094']);
     });
}
use Ecotone\Kafka\Configuration\KafkaBrokerConfiguration;

$application = EcotoneLiteApplication::boostrap(
    [
        KafkaBrokerConfiguration::class => new KafkaBrokerConfiguration(['localhost:9094'])
    ]
);

Message Publisher

Custom Publisher

To create custom publisher or consumer provide Service Context.

Custom Publishers and Consumers are great for building integrations for existing infrastructure or setting up a customized way to communicate between applications. With this you can take over the control of what is published and how it's consumed.

Custom Publisher

Then Publisher will be available for us in Dependency Container under MessagePublisher reference. This will make it available in your dependency container under MessagePublisher name.

Providing custom rdkafka configuration

You can modify your Message Publisher with specific :

Message Channel

To understand the use case behind Message Channels read section for Application level processing and section for cross application communication.

Message Channel

To create Kafka Backed , we need to create .

Now orders channel will be available in our Messaging System.

class MessagingConfiguration
{
    #[ServiceContext] 
    public function distributedPublisher()
    {
        return KafkaPublisherConfiguration::createWithDefaults(
            topicName: 'orders'
        );
    }
}
rdkafka configuration

Message Channels simplify to the maximum integration with Message Broker. From application perspective all we need to do, is to provide channel implementation. Ecotone will take care of whole publishing and consuming part.

Customize Topic Name

By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:

Customize Group Id

We can also customize the group id, which by default following channel name:

Position of Message Consumer is tracked against given group id.. Depending on retention policy, changing group id for existing Message Channel may result in re-delivering messages.

Final Failure Strategy

To define Final Failure Strategy:

Asynchronous Processing
Distributed Bus
Message Channel
Service Context
#[ServiceContext] 
public function distributedPublisher()
{
    return KafkaPublisherConfiguration::createWithDefaults(
        topicName: 'orders'
    )
        ->setConfiguration('request.required.acks', '1');
}
class MessagingConfiguration
{
    #[ServiceContext] 
    public function orderChannel()
    {
        return KafkaMessageChannelBuilder::create("orders");
    }
}
#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "orders",
        topicName: "crm_orders"
    );
}
#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "orders",
        groupId: "crm_application"
    );
}
#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "orders",
        groupId: "crm_application"
    )
        ->withFinalFailureStrategy(FinalFailureStrategy::RESEND);
}

Message partitioning

Kafka architecture

Ecotone provides Message Channel abstraction, which is used for simplifying asynchronous processing. It's good to understand the difference in Kafka Message Channel vs typicial Queue based Message Channel, as it provides extra abilities. To register Kafka Message Channel, we will use Service Context:

Now, when we will be sending Messages to Kafka Message Channel, we will actually be sending to Kafka Topic. Topics can be split in multiple partitions, and each partition divine our topic into separate processing unit.

Event Message placed in the topic combined of 3 partitions

When we will start our Message Consumer process:

our Message Consumer process will get partitions allocated:

The interesting part is, what will happen when we will start second Message Consumption process:

As a result of running another Message Consumer for given Topic, Kafka will kick-off rebalance process (repartitioning) to re-assign partitions to given Message Consumers. Each partition can only be assigned to single Message Consumer. Therefore in our example, Kafka could decide to assign partition one and two to the first Message Consumer, and partition three to the second Message Consumer.

Scaling Message Consumers over amount of partitions won't speed up processing, as we can only scale, up to partitions volume for given Topic. It's good practice to over-provision partitions for given topic at the beginning, to ensure future scalability needs.

Partition Order

Given partition is guaranteed to be handled by single Message Consumer at time. Therefore this enables order for the Messages, as we will be consuming Messages in order they have been published.

However for partitioning to be useful we need to ensure that related Messages will land in same partition, if Messages are placed randomly, benefits of partitioning are basically lost. To choose the partition Kafka use hashing mechanism based on provided Key. Hashed key will generate number to which partition provided Message should be assigned.

Providing same key for correlated resource will ensure, that Messages will land in same partition, therefore will be handled in order.

Providing partition key

When we are sending Message that goes over Kafka Message Channel, we can provide the key to be used explicitly

Then this will be used for hashing and all Messages sent with key "123" will be placed in same partition and preserve the order.

Default partitioning mechanism

By default if no partition key is provided, Ecotone will use Message Id as partition key. This means messages will be distributed randomly across partitions.

However If we would like to provide partition key manually for each Message, to ensure correct partition, it would quickly become unmaintainable. Fortunately Ecotone comes with support for Aggregate Command and Events, so we don't need to think about this much

Aggregate Commands partitioning mechanism

Whenever we send an Message to Aggregate, Ecotone will use the identifier of the Aggregate as partition key. This way order will be preserved within Aggregate instance. For example having Aggregate Ticket with ticketId identifier:

and Command CloseTicket

Then if we would send an Command with id "123", it will be used as partition key:

Ecotone recognizes to which Aggregate instance we are sending the Message, and will use as partition key without any additional code needed on our side.

Aggregate Events partitioning mechanism

Events published from Aggregates are automatically assigned partition key. This way Events for same Aggregate instance will land in related partition automatically.

Partition key is automatically assigned to Events, no matter if we use Event Sourced or State-Stored Aggregates.

Kafka Consumer

To quickly get up and running with consuming existing Topics, we can use Kafka Consumer feature. Simply by marking given method with Kafka Consumer attribute, we are getting access to asynchronous process that will now run and consume Message from defined Topic.

Kafka Consumer

To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:

Then we run it as any other , using orderConsumer name.

#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "users"
    );
}
bin/console ecotone:run users -vvv
Message Consumer receiving Messages from three partitions
Scaling Message Consumers triggers rebalance process, which reassigning partions
Providing group id

By default Consumer Group id will be same as endpoint id, however we can provide customized name if needed:

Providing custom rdkafka configuration

You can modify your Message Consumer with specific rdkafka configuration:

Kafka Headers

We can accesss specific Kafka Headers using standard Ecotone's metadata mechanism

  • kafka_topic - Topic name for incoming message

  • kafka_partition - Partition of incoming message

  • kafka_offset - Offset of Message Consumer

Automatic Conversion

If we have our Custom Conversion or JMS Module installed, then we can leverage automatic conversion:

Read more about Conversion in related section.

Instant Retry

In case of failure we may try to recover instantly. To do so we can provide InstantRetry attribute:

Read more about Instant Retry in related section.

Error Channel

To handle Message when failure happen, we may decide to send it to Error Channel. This can be used to for example store the Message for later review.

Read more about Error Channel in related section.

Final Failure Strategy

We can also define in case of unrecoverable failure, what should happen:

Read more about final failure strategy in related section.

Deduplication

We can define custom deduplication key to ensure no same Message will be handled twice.

Read more about deduplication in related section.

asynchronous consumer
artisan ecotone:run users -vvv
$messagingSystem->run("users");
$commandBus->->send(
  new ScheduleMeeting(),
  metadata: [
     KafkaHeader::KAFKA_TARGET_PARTITION_KEY_HEADER_NAME => '123'
  ]
);
#[Aggregate]
class Ticket
{
    #[Identifier]
    private int $ticketId;
    
    #[Asynchronous("async")]
    #[CommandHandler]
    public function close(CloseTicket $command): void
final readonly class CloseTicket
{
    public function __construct(public string $ticketId)
    {
    }
}
$commandBus->->send(new CloseTicket(123));
#[Aggregate]
class Ticket
{
    use withEvents;

    #[Identifier]
    private int $ticketId;
    
    #[Asynchronous("async")]
    #[CommandHandler]
    public function close(CloseTicket $command): void
    {
        // do something
        $this->recordThat(new TicketClosed($this->ticketId));
    }
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders'],
    groupId: 'order_subscriber'
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}
#[ServiceContext] 
public function distributedPublisher()
{
    return KafkaConsumerConfiguration::createWithDefaults(
        endpointId: 'orderConsumers'
    )
        ->setConfiguration('auto.offset.reset', 'earliest');
}
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(
    string $payload, 
    #[Header("kafka_topic")] string $topicName,
): void
{
    // do something
}
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // do something
}
#[InstantRetry(retryTimes: 3)]
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // handle
}
#[ErrorChannel('customErrorChannel')]
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // handle
}
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders'],
    finalFailureStrategy: FinalFailureStrategy::RESEND
)]
public function handle(Order $payload): void
{
    // handle
}
#[Deduplicated('orderId')]
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // handle
}