Kafka
Module for creating declarative Apache Kafka Consumer
and Producer
using annotations.
Dependency¶
Dependency build.gradle
:
Module:
Dependency build.gradle.kts
:
Module:
Consumer¶
Descriptions of working with Kafka Consumer
Creating a Consumer
requires using the @KafkaListener
annotation over a method:
The @KafkaListener
annotation parameter points to the Consumer
configuration path.
In case you need different behavior for different topics, it is possible to create several such containers, each with its own individual configuration. It looks like this:
The value in the annotation indicates from which part of the configuration file the settings should be taken. As far as getting the configuration is concerned - works similarly to @ConfigSource
Configuration¶
Configuration describes the settings of a particular @KafkaListener
and an example for the configuration at path kafka.someConsumer
is given below.
Example of the complete configuration described in the KafkaListenerConfig
class (default or example values are specified):
kafka {
someConsumer {
topics = ["topic1", "topic2"] //(1)!
topicsPattern = "topic*" //(2)!
allowEmptyRecords = false //(3)!
offset = "latest" //(4)!
pollTimeout = "5s" //(5)!
backoffTimeout = "15s" //(6)!
partitionRefreshInterval = "1m" //(7)!
threads = 1 //(8)!
shutdownWait = "30s" //(9)!
driverProperties { //(10)!
"bootstrap.servers": "localhost:9093"
"group.id": "my-group-id"
}
telemetry {
logging {
enabled = false //(11)!
}
metrics {
enabled = true //(12)!
slo = [1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000] //(12)!
}
tracing {
enabled = true //(14)!
}
}
}
}
- Specifies the topics to which Consumer will subscribe (required or specify
topicsPattern
) - Specifies the pattern of topics to which the Consumer will subscribe (required or
topics
is specified). - Whether to process empty records in case the signature accepts
ConsumerRecords
- Works only if
group.id
is not specified. Specifies which position in the topics the Consumer should use.Valid values are:earliest
- earliest available offsetlatest
- latest available offset- String in
Duration
format, e.g.5m
- shift back a certain time.
- Maximal waiting time for messages from a topic within one call
- Maximum waiting time between unexpected exceptions during processing
- Time interval within which it is required to update partitions in case of
assign
method - Number of threads on which the consumer will be started for parallel processing (if it is equal to 0 then no consumer will be started at all)
- Waiting time for processing before switching off the consumer in case of gracefull shutdown
- Properties from the official kafka client, documentation on them can be found at link (required)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configuring SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
kafka:
someConsumer:
topics: #(1)!
- "topic1"
- "topic2"
topicsPattern: "topic*" #(2)!
allowEmptyRecords: false #(3)!
offset: "latest" #(4)!
pollTimeout: "5s" #(5)!
backoffTimeout: "15s" #(6)!
partitionRefreshInterval: "1m" #(7)!
threads: 1 #(8)!
shutdownWait: "30s" #(9)!
driverProperties: #(10)!
bootstrap.servers: "localhost:9093"
group.id: "my-group-id"
telemetry:
logging:
enabled: false #(11)!
metrics:
enabled: true #(12)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(12)!
telemetry:
enabled: true #(14)!
- Specifies the topics to which Consumer will subscribe (required or specify
topicsPattern
) - Specifies the pattern of topics to which the Consumer will subscribe (required or
topics
is specified). - Whether to process empty records in case the signature accepts
ConsumerRecords
- Works only if
group.id
is not specified. Specifies which position in the topics the Consumer should use.Valid values are:earliest
- earliest available offsetlatest
- latest available offset- String in
Duration
format, e.g.5m
- shift back a certain time.
- Maximal waiting time for messages from a topic within one call
- Maximum waiting time between unexpected exceptions during processing
- Time interval within which it is required to update partitions in case of
assign
method - Number of threads on which the consumer will be started for parallel processing (if it is equal to 0 then no consumer will be started at all)
- Waiting time for processing before switching off the consumer in case of gracefull shutdown
- Properties from the official kafka client, documentation on them can be found at link (required)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configuring SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
Consume strategy¶
subscribe
strategy involves the use of group.id,
to group the executors so that they do not duplicate the reading of records from their queue across multiple application instances.
In the case where you want each application instance to read messages from a topic at the same time as the others, the assign
strategy is supposed to be used,
to do this you simply don't specify group.id
in the consumer configuration, but in this strategy you can only specify 1 topic at a time.
Example of assign
strategy configuration:
Signatures¶
Available signatures for Kafka consumer out-of-the-box methods, where K
refers to the key type and V
to the message value type.
Allows to accept value
(mandatory), key
(optional), Headers
(optional) from ConsumerRecord
,
Exception
(optional) in case of serialization/connection error and after all events are processed, commitSync()
is called:
Accepts ConsumerRecord
/ConsumerRecords
and KafkaConsumerRecordsTelemetryContext
/KafkaConsumerRecordTelemetryContext
(optional), once all ConsumerRecords
have been processed, commitSync()
is called:
Accepts ConsumerRecord
/ConsumerRecords
and Consumer
.
As in the previous case, commit
must be called manually.
Called for each ConsumerRecord
obtained by calling poll()
:
Deserialization¶
Deserializer
- used to deserialize ConsumerRecord
keys and values.
Tags are supported to better customize the Deserializer
.
Tags can be set on parameter-key, parameter-value, as well as on parameters of type ConsumerRecord
and ConsumerRecords
.
These tags will be set on container dependencies.
@Component
final class ConsumerService {
@KafkaListener("kafka.someConsumer1")
void process1(@Tag(Sometag1.class) String key, @Tag(Sometag2.class) String value) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
void process2(ConsumerRecord<@Tag(Sometag1.class) String, @Tag(Sometag2.class) String> record) {
// some handler code
}
}
@Component
class ConsumerService {
@KafkaListener("kafka.someConsumer1")
fun process1(@Tag(Sometag1::class) key: String, @Tag(Sometag2::class) value: String) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
fun process2(record: ConsumerRecord<@Tag(Sometag1::class) String, @Tag(Sometag2::class) String>) {
// some handler code
}
}
In case deserialization from Json
is required, the @Json
tag can be used:
@Component
final class ConsumerService {
@Json
public record JsonEvent(String name, Integer code) {}
@KafkaListener("kafka.someConsumer1")
void process1(String key, @Json JsonEvent value) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
void process2(ConsumerRecord<String, @Json JsonEvent> record) {
// some handler code
}
}
For non-key handlers, the default is Deserializer<byte[]>
since it simply returns unhandled bytes.
Exception handling¶
If the method labeled @KafkaListener
throws an exception, Consumer will be restarted,
because there is no general solution on how to handle this and the developer must decide how to handle it.
Deserialization errors¶
If you use a signature with ConsumerRecord
or ConsumerRecords
, you will get a value deserialization exception at the moment of calling the key
or value
methods.
At that point, it is worth handling it in the way you want.
The following exceptions are thrown:
ru.tinkoff.kora.kafka.common.exceptions.RecordKeyDeserializationException`.
ru.tinkoff.kora.kafka.common.exceptions.RecordValueDeserializationException
.
From these exceptions, you can get a raw ConsumerRecord<byte[], byte[]>
.
If you use a signature with unpacked key
/value
/headers
, you can add Exception
, Throwable
, RecordKeyDeserializationException
, RecordKeyDeserializationException
with the last argument
or RecordValueDeserializationException
.
Note that all arguments become optional, meaning we expect to either have a key and value or an exception.
Custom tag¶
Automatic tag is created for the consumer by default, it can be viewed in the generated module at compile time.
If for some reason you need to override the consumer tag, you can set it as an argument to the @KafkaListener
annotation:
Rebalance events¶
You can listen and react to rebalance events with your implementation of the ConsumerAwareRebalanceListener
interface,
it should be provided as a component by the consumer tag:
@Tag(SomeListenerProcessTag.class)
@Component
public final class SomeListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
}
@Tag(SomeListenerProcessTag::class)
@Component
class SomeListener : ConsumerAwareRebalanceListener {
override fun onPartitionsRevoked(consumer: Consumer<*, *>, partitions: Collection<TopicPartition>) {
}
override fun onPartitionsAssigned(consumer: Consumer<*, *>, partitions: Collection<TopicPartition>) {
}
}
Manual override¶
Kora provides a small wrapper over KafkaConsumer
that allows you to easily trigger the handling of incoming events.
The container constructor is as follows:
public KafkaSubscribeConsumerContainer(KafkaListenerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
BaseKafkaRecordsHandler<K, V> handler) {
this.factory = new KafkaConsumerFactory<>(config);
this.handler = handler;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.config = config;
}
BaseKafkaRecordsHandler<K,V>
is the basic functional interface of the handler:
@FunctionalInterface
public interface BaseKafkaRecordsHandler<K, V> {
void handle(ConsumerRecords<K, V> records, KafkaConsumer<K, V> consumer);
}
Producer¶
Descriptions of working with Kafka Producer
Assume to use the @KafkaPublisher
annotation on the interface to create Kafka Producer
,
in order to send messages to any topic it is supposed to create a method with the signature ProducerRecord
:
The annotation parameter indicates the path to the configuration.
Topic¶
In case it is required to use typed contracts for specific topics, the @KafkaPublisher.Topic
annotation is supposed to be used
to create such contracts:
The annotation parameter indicates the path for the configuration of the topic.
Configuration¶
Configuration describes the settings of a particular @KafkaPublisher
and an example is given below for the configuration on the kafka.someConsumer
path.
Example of the complete configuration described in the KafkaPublisherConfig
class (default or example values are specified):
kafka {
someProducer {
driverProperties { //(1)!
"bootstrap.servers": "localhost:9093"
}
telemetry {
logging {
enabled = false //(2)!
}
metrics {
enabled = true //(3)!
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(4)!
}
tracing {
enabled = true //(5)!
}
}
}
}
- Properties from the official kafka client, documentation on them can be found at link (required)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
kafka:
someProducer:
driverProperties: #(1)!
bootstrap.servers: "localhost:9093"
telemetry:
logging:
enabled: true #(2)!
metrics:
enabled: true #(3)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(4)!
telemetry:
enabled: true #(5)!
- Properties from the official kafka client, documentation on them can be found at link (required)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
Topic configuration describes the settings of a particular @KafkaPublisher.Topic
and an example for the configuration at path path.to.topic.config
is given below.
Example of the complete configuration described in the KafkaPublisherConfig.TopicConfig
class (default or example values are specified):
- Topic where method will send data (required)
- Partition of the topic where method will send data (optional)
Signatures¶
Allows value
and key
(optional) and headers
(optional) to be sent from ProducerRecord
:
Can be received as the result of a RecordMetadata
operation:
Can be obtained as the result of a Future<RecordMetadata>
or CompletionStage<RecordMetadata>
operation:
It is possible to send ProducerRecord
with or without Callback
and combine the response with the examples above:
Serialization¶
In order to specify which Serializer
to take from a container, there is an option to use tags.
Tags should be set on ProducerRecord
or key
/value
parameters of methods:
If you want to serialize as Json, you should use @Json
annotation:
Exception handling¶
In case of a submission error in a method annotated @Topic
and which does not return Future<RecordMetadata>
a ru.tinkoff.kora.kafka.kora.kafka.common.exceptions.KafkaPublishException
will be thrown.
where in cause
will lie the actual error from KafkaProducer
.
Serialization errors¶
In case of a key/value serialization error in a method annotated with @Topic
, org.apache.kafka.common.errors.SerializationException
will be thrown
similar to what would happen in the case of org.apache.kafka.kafka.clients.producer.Producer#send
.
Transactions¶
It is possible to send a message to Kafka in within a transaction, this is supposed to use the
@KafkaPublisher
annotation to create such a KafkaProducer
.
It is required to first create a regular KafkaProducer
and then use it to create a transactional Producer:
@@KafkaPublisher("kafka.someProducer")
public interface MyPublisher {
@KafkaPublisher.Topic("kafka.someProducer.someTopic")
void send(String key, String value);
}
@KafkaPublisher("kafka.someTransactionalProducer")
public interface MyTransactionalPublisher extends TransactionalPublisher<MyPublisher> {
}
It is expected to use inTx
methods to send such messages, all messages within Lambda will be applied if it is successful and canceled if it fails.
It is also possible to manually perform all manipulations with KafkaProducer
:
Configuration¶
KafkaPublisherConfig.TransactionConfig
is used to configure @KafkaPublisher
with the TransactionalPublisher
interface: