Usage

Message Channel

To create Kafka Backed Message Channel, we need to create Service Context.

class MessagingConfiguration
{
    #[ServiceContext] 
    public function orderChannel()
    {
        return KafkaMessageChannelBuilder::create("orders");
    }
}

Now orders channel will be available in our Messaging System.

Customize Topic Name

By default the queue name will follow channel name, which in above example will be "orders". However we can use "orders" as reference name in our Application, yet name queue differently:

#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "orders",
        topicName: "crm_orders"
    );
}

Customize Group Id

We can also customize the group id, which by default following channel name:

#[ServiceContext] 
public function orderChannel()
{
    return KafkaMessageChannelBuilder::create(
        channelName: "orders",
        groupId: "crm_application"
    );
}

Custom Publisher

To create custom publisher or consumer provide Service Context.

Custom Publisher

class MessagingConfiguration
{
    #[ServiceContext] 
    public function distributedPublisher()
    {
        return KafkaPublisherConfiguration::createWithDefaults(
            topicName: 'orders'
        );
    }
}

Then Publisher will be available for us in Dependency Container under MessagePublisher reference. This will make it available in your dependency container under MessagePublisher name.

Providing custom rdkafka configuration

You can modify your Message Publisher with specific rdkafka configuration:

#[ServiceContext] 
public function distributedPublisher()
{
    return KafkaPublisherConfiguration::createWithDefaults(
        topicName: 'orders'
    )
        ->setConfiguration('request.required.acks', '1');
}

Custom Consumer

To set up Consumer, consuming from given topics, all we need to do, is to mark given method with KafkaConsumer attribute:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}

Then we run it as any other asynchronous consumer, using orderConsumer name.

Providing group id

By default Consumer Group id will be same as endpoint id, however we can provide customized name if needed:

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders'],
    groupId: 'order_subscriber'
)]
public function handle(string $payload, array $metadata): void
{
    // do something
}

Providing custom rdkafka configuration

You can modify your Message Consumer with specific rdkafka configuration:

#[ServiceContext] 
public function distributedPublisher()
{
    return KafkaConsumerConfiguration::createWithDefaults(
        endpointId: 'orderConsumers'
    )
        ->setConfiguration('auto.offset.reset', 'earliest');
}

Kafka Headers

We can accesss specific Kafka Headers using standard Ecotone's metadata mechanism

  • kafka_topic - Topic name for incoming message

  • kafka_partition - Partition of incoming message

  • kafka_offset - Offset of Message Consumer

#[KafkaConsumer(
    endpointId: 'orderConsumers', 
    topics: ['orders']
)]
public function handle(
    string $payload, 
    #[Header("kafka_topic")] string $topicName,
): void
{
    // do something
}

Last updated

Was this helpful?