Kafka Consumer

To quickly get up and running with consuming existing Topics, we can use Kafka Consumer feature. Simply by marking given method with Kafka Consumer attribute, we are getting access to asynchronous process that will now run and consume Message from defined Topic.

Kafka Consumer

To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}

Then we run it as any other asynchronous consumer, using orderConsumer name.

Providing group id

By default Consumer Group id will be same as endpoint id, however we can provide customized name if needed:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders'],
    groupId: 'order_subscriber'
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}

Providing custom rdkafka configuration

You can modify your Message Consumer with specific rdkafka configuration:

#[ServiceContext] 
public function distributedPublisher()
{
    return KafkaConsumerConfiguration::createWithDefaults(
        endpointId: 'orderConsumers'
    )
        ->setConfiguration('auto.offset.reset', 'earliest');
}

Kafka Headers

We can accesss specific Kafka Headers using standard Ecotone's metadata mechanism

  • kafka_topic - Topic name for incoming message

  • kafka_partition - Partition of incoming message

  • kafka_offset - Offset of Message Consumer

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(
    string $payload, 
    #[Header("kafka_topic")] string $topicName,
): void
{
    // do something
}

Automatic Conversion

If we have our Custom Conversion or JMS Module installed, then we can leverage automatic conversion:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // do something
}

Read more about Conversion in related section.

Instant Retry

In case of failure we may try to recover instantly. To do so we can provide InstantRetry attribute:

#[InstantRetry(retryTimes: 3)]
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // handle
}

Read more about Instant Retry in related section.

Error Channel

To handle Message when failure happen, we may decide to send it to Error Channel. This can be used to for example store the Message for later review.

#[ErrorChannel('customErrorChannel')]
#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(Order $payload): void
{
    // handle
}

Read more about Error Channel in related section.

Final Failure Strategy

We can also define in case of unrecoverable failure, what should happen:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders'],
    finalFailureStrategy: FinalFailureStrategy::RESEND
)]
public function handle(Order $payload): void
{
    // handle
}

Read more about final failure strategy in related section.

Last updated

Was this helpful?