Kafka Consumer
To quickly get up and running with consuming existing Topics, we can use Kafka Consumer feature. Simply by marking given method with Kafka Consumer attribute, we are getting access to asynchronous process that will now run and consume Message from defined Topic.
Kafka Consumer
To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(string $payload, array $metadata): void
{
// do something
}
Then we run it as any other asynchronous consumer, using orderConsumer name.
Providing group id
By default Consumer Group id will be same as endpoint id, however we can provide customized name if needed:
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders'],
groupId: 'order_subscriber'
)]
public function handle(string $payload, array $metadata): void
{
// do something
}
Providing custom rdkafka configuration
You can modify your Message Consumer with specific rdkafka configuration:
#[ServiceContext]
public function distributedPublisher()
{
return KafkaConsumerConfiguration::createWithDefaults(
endpointId: 'orderConsumers'
)
->setConfiguration('auto.offset.reset', 'earliest');
}
Kafka Headers
We can accesss specific Kafka Headers using standard Ecotone's metadata mechanism
kafka_topic - Topic name for incoming message
kafka_partition - Partition of incoming message
kafka_offset - Offset of Message Consumer
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(
string $payload,
#[Header("kafka_topic")] string $topicName,
): void
{
// do something
}
Automatic Conversion
If we have our Custom Conversion or JMS Module installed, then we can leverage automatic conversion:
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// do something
}
Read more about Conversion in related section.
Instant Retry
In case of failure we may try to recover instantly. To do so we can provide InstantRetry attribute:
#[InstantRetry(retryTimes: 3)]
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// handle
}
Read more about Instant Retry in related section.
Error Channel
To handle Message when failure happen, we may decide to send it to Error Channel. This can be used to for example store the Message for later review.
#[ErrorChannel('customErrorChannel')]
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders']
)]
public function handle(Order $payload): void
{
// handle
}
Read more about Error Channel in related section.
Final Failure Strategy
We can also define in case of unrecoverable failure, what should happen:
#[KafkaConsumer(
endpointId: 'orderConsumers',
topics: ['orders'],
finalFailureStrategy: FinalFailureStrategy::RESEND
)]
public function handle(Order $payload): void
{
// handle
}
Read more about final failure strategy in related section.
Last updated
Was this helpful?