Ecotone
SponsorBlogGithubSupport and ContactCommunity Channel
  • About
  • Installation
  • How to use
    • CQRS PHP
    • Event Handling PHP
    • Aggregates & Sagas
    • Scheduling in PHP
    • Asynchronous PHP
    • Event Sourcing PHP
    • Microservices PHP
    • Resiliency and Error Handling
    • Laravel Demos
    • Symfony Demos
      • Doctrine ORM
  • Tutorial
    • Before we start tutorial
    • Lesson 1: Messaging Concepts
    • Lesson 2: Tactical DDD
    • Lesson 3: Converters
    • Lesson 4: Metadata and Method Invocation
    • Lesson 5: Interceptors
    • Lesson 6: Asynchronous Handling
  • Enterprise
  • Modelling
    • Introduction
    • Message Bus and CQRS
      • CQRS Introduction - Commands
        • Query Handling
        • Event Handling
      • Aggregate Introduction
        • Aggregate Command Handlers
        • Aggregate Query Handlers
        • Aggregate Event Handlers
        • Advanced Aggregate creation
      • Repositories Introduction
      • Business Interface
        • Introduction
        • Business Repository
        • Database Business Interface
          • Converting Parameters
          • Converting Results
      • Saga Introduction
      • Identifier Mapping
    • Extending Messaging (Middlewares)
      • Message Headers
      • Interceptors (Middlewares)
        • Additional Scenarios
      • Intercepting Asynchronous Endpoints
      • Extending Message Buses (Gateways)
    • Event Sourcing
      • Installation
      • Event Sourcing Introduction
        • Working with Event Streams
        • Event Sourcing Aggregates
          • Working with Aggregates
          • Applying Events
          • Different ways to Record Events
        • Working with Metadata
        • Event versioning
        • Event Stream Persistence
          • Event Sourcing Repository
          • Making Stream immune to changes
          • Snapshoting
          • Persistence Strategies
          • Event Serialization and PII Data (GDPR)
      • Projection Introduction
        • Configuration
        • Choosing Event Streams for Projection
        • Executing and Managing
          • Running Projections
          • Projection CLI Actions
          • Access Event Store
        • Projections with State
        • Emitting events
    • Recovering, Tracing and Monitoring
      • Resiliency
        • Retries
        • Error Channel and Dead Letter
          • Dbal Dead Letter
        • Idempotent Consumer (Deduplication)
        • Resilient Sending
        • Outbox Pattern
        • Concurrency Handling
      • Message Handling Isolation
      • Ecotone Pulse (Service Dashboard)
    • Asynchronous Handling and Scheduling
      • Asynchronous Message Handlers
      • Asynchronous Message Bus (Gateways)
      • Delaying Messages
      • Time to Live
      • Message Priority
      • Scheduling
      • Dynamic Message Channels
    • Distributed Bus and Microservices
      • Distributed Bus
        • Distributed Bus with Service Map
          • Configuration
          • Custom Features
          • Non-Ecotone Application integration
          • Testing
        • AMQP Distributed Bus (RabbitMQ)
          • Configuration
        • Distributed Bus Interface
      • Message Consumer
      • Message Publisher
    • Business Workflows
      • The Basics - Stateless Workflows
      • Stateful Workflows - Saga
      • Handling Failures
    • Testing Support
      • Testing Messaging
      • Testing Aggregates and Sagas with Message Flows
      • Testing Event Sourcing Applications
      • Testing Asynchronous Messaging
  • Messaging and Ecotone In Depth
    • Overview
    • Multi-Tenancy Support
      • Getting Started
        • Any Framework Configuration
        • Symfony and Doctrine ORM
        • Laravel
      • Different Scenarios
        • Hooking into Tenant Switch
        • Shared and Multi Database Tenants
        • Accessing Current Tenant in Message Handler
        • Events and Tenant Propagation
        • Multi-Tenant aware Dead Letter
      • Advanced Queuing Strategies
    • Document Store
    • Console Commands
    • Messaging concepts
      • Message
      • Message Channel
      • Message Endpoints/Handlers
        • Internal Message Handler
        • Message Router
        • Splitter
      • Consumer
      • Messaging Gateway
      • Inbound/Outbound Channel Adapter
    • Method Invocation And Conversion
      • Method Invocation
      • Conversion
        • Payload Conversion
        • Headers Conversion
    • Service (Application) Configuration
    • Contributing to Ecotone
      • How Ecotone works under the hood
      • Ecotone Phases
      • Registering new Module Package
      • Demo Integration with SQS
        • Preparation
        • Inbound and Outbound Adapters and Message Channel
        • Message Consumer and Publisher
  • Modules
    • Overview
    • Symfony
      • Symfony Configuration
      • Symfony Database Connection (DBAL Module)
      • Doctrine ORM
      • Symfony Messenger Transport
    • Laravel
      • Laravel Configuration
      • Database Connection (DBAL Module)
      • Eloquent
      • Laravel Queues
      • Laravel Octane
    • Ecotone Lite
      • Logging
      • Database Connection (DBAL Module)
    • JMS Converter
    • OpenTelemetry (Tracing and Metrics)
      • Configuration
    • RabbitMQ Support
    • Kafka Support
      • Configuration
      • Message partitioning
      • Usage
    • DBAL Support
    • Amazon SQS Support
    • Redis Support
  • Other
    • Contact, Workshops and Support
Powered by GitBook
On this page
  • Kafka architecture
  • Partition Order
  • Providing partition key
  • Default partitioning mechanism
  • Aggregate Commands partitioning mechanism
  • Aggregate Events partitioning mechanism

Was this helpful?

Export as PDF
  1. Modules
  2. Kafka Support

Message partitioning

PreviousConfigurationNextUsage

Last updated 2 months ago

Was this helpful?

Kafka architecture

Ecotone provides Message Channel abstraction, which is used for simplifying asynchronous processing. It's good to understand the difference in Kafka Message Channel vs typicial Queue based Message Channel, as it provides extra abilities. To register Kafka Message Channel, we will use Service Context:

#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "users"
    );
}

Now, when we will be sending Messages to Kafka Message Channel, we will actually be sending to Kafka Topic. Topics can be split in multiple partitions, and each partition divine our topic into separate processing unit.

When we will start our Message Consumer process:

bin/console ecotone:run users -vvv
artisan ecotone:run users -vvv
$messagingSystem->run("users");

our Message Consumer process will get partitions allocated:

The interesting part is, what will happen when we will start second Message Consumption process:

As a result of running another Message Consumer for given Topic, Kafka will kick-off rebalance process (repartitioning) to re-assign partitions to given Message Consumers. Each partition can only be assigned to single Message Consumer. Therefore in our example, Kafka could decide to assign partition one and two to the first Message Consumer, and partition three to the second Message Consumer.

Scaling Message Consumers over amount of partitions won't speed up processing, as we can only scale, up to partitions volume for given Topic. It's good practice to over-provision partitions for given topic at the beginning, to ensure future scalability needs.

Partition Order

Given partition is guaranteed to be handled by single Message Consumer at time. Therefore this enables order for the Messages, as we will be consuming Messages in order they have been published.

However for partitioning to be useful we need to ensure that related Messages will land in same partition, if Messages are placed randomly, benefits of partitioning are basically lost. To choose the partition Kafka use hashing mechanism based on provided Key. Hashed key will generate number to which partition provided Message should be assigned.

Providing same key for correlated resource will ensure, that Messages will land in same partition, therefore will be handled in order.

Providing partition key

When we are sending Message that goes over Kafka Message Channel, we can provide the key to be used explicitly

$commandBus->->send(
  new ScheduleMeeting(),
  metadata: [
     KafkaHeader::KAFKA_TARGET_PARTITION_KEY_HEADER_NAME => '123'
  ]
);

Then this will be used for hashing and all Messages sent with key "123" will be placed in same partition and preserve the order.

Default partitioning mechanism

By default if no partition key is provided, Ecotone will use Message Id as partition key. This means messages will be distributed randomly across partitions.

However If we would like to provide partition key manually for each Message, to ensure correct partition, it would quickly become unmaintainable. Fortunately Ecotone comes with support for Aggregate Command and Events, so we don't need to think about this much

Aggregate Commands partitioning mechanism

Whenever we send an Message to Aggregate, Ecotone will use the identifier of the Aggregate as partition key. This way order will be preserved within Aggregate instance. For example having Aggregate Ticket with ticketId identifier:

#[Aggregate]
class Ticket
{
    #[Identifier]
    private int $ticketId;
    
    #[Asynchronous("async")]
    #[CommandHandler]
    public function close(CloseTicket $command): void

and Command CloseTicket

final readonly class CloseTicket
{
    public function __construct(public string $ticketId)
    {
    }
}

Then if we would send an Command with id "123", it will be used as partition key:

$commandBus->->send(new CloseTicket(123));

Ecotone recognizes to which Aggregate instance we are sending the Message, and will use as partition key without any additional code needed on our side.

Aggregate Events partitioning mechanism

Events published from Aggregates are automatically assigned partition key. This way Events for same Aggregate instance will land in related partition automatically.

#[Aggregate]
class Ticket
{
    use withEvents;

    #[Identifier]
    private int $ticketId;
    
    #[Asynchronous("async")]
    #[CommandHandler]
    public function close(CloseTicket $command): void
    {
        // do something
        $this->recordThat(new TicketClosed($this->ticketId));
    }

Partition key is automatically assigned to Events, no matter if we use Event Sourced or State-Stored Aggregates.

Event Message placed in the topic combined of 3 partitions
Message Consumer receiving Messages from three partitions
Scaling Message Consumers triggers rebalance process, which reassigning partions