Kafka
Модуль для создания декларативных Apache Kafka Consumer
и Producer
с помощью аннотаций.
Подключение¶
Зависимость build.gradle
:
Модуль:
Зависимость build.gradle.kts
:
Модуль:
Потребитель¶
Описания работы с Kafka Consumer
Для создания Consumer
требуется использовать аннотацию @KafkaListener
над методом:
Параметр аннотации @KafkaListener
указывает на путь к конфигурации Consumer
'а.
В случае, если нужно разное поведение для разных топиков, существует возможность создавать несколько подобных контейнеров, каждый со своим индивидуальным конфигом. Выглядит это так:
Значение в аннотации указывает, из какой части файла конфигурации нужно брать настройки. В том, что касается получения конфигурации — работает аналогично @ConfigSource
Конфигурация¶
Конфигурация описывает настройки конкретного @KafkaListener
и ниже указан пример для конфигурации по пути kafka.someConsumer
.
Пример полной конфигурации, описанной в классе KafkaListenerConfig
(указаны примеры значений или значения по умолчанию):
kafka {
someConsumer {
topics = ["topic1", "topic2"] //(1)!
topicsPattern = "topic*" //(2)!
allowEmptyRecords = false //(3)!
offset = "latest" //(4)!
pollTimeout = "5s" //(5)!
backoffTimeout = "15s" //(6)!
partitionRefreshInterval = "1m" //(7)!
threads = 1 //(8)!
shutdownWait = "30s" //(9)!
driverProperties { //(10)!
"bootstrap.servers": "localhost:9093"
"group.id": "my-group-id"
}
telemetry {
logging {
enabled = false //(11)!
}
metrics {
enabled = true //(12)!
slo = [1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000] //(12)!
}
tracing {
enabled = true //(14)!
}
}
}
}
- Указываются топики на которые будет подписан Consumer (обязательный либо указывается
topicsPattern
) - Указываются паттерн топиков на которые будет подписан Consumer (обязательный либо указывается
topics
) - Обрабатывать ли пустые записи в случае если сигнатура принимает
ConsumerRecords
- Работает только если не указан
group.id
. Определяет стратегнию какую позицию в топике должен использовать Consumer. Допустимые значение:earliest
- самый ранний доступный offsetlatest
- последний доступный offset- Строка в формате
Duration
(например5m
) - сдвиг на определённое время назад
- Максиимальное время ожидания сообщений из топика в рамках одного вызова
- Максимальное время ожидания между неожиданными исключениями во время обработки
- Временной интервал в рамках которого требуется делать обновление партиций в случае
assign
метода - Количество потоков на которых будет запущен потребитель для параллельной обработки (если будет равен 0 то ни один потребитель не будет запущен вообще)
- Время ожидания обработки перед выключением потребителя в случае штатного завершения
- Properties из официального клиента кафки, документацию по ним можно посмотреть по ссылке (обязательный)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
kafka:
someConsumer:
topics: #(1)!
- "topic1"
- "topic2"
topicsPattern: "topic*" #(2)!
allowEmptyRecords: false #(3)!
offset: "latest" #(4)!
pollTimeout: "5s" #(5)!
backoffTimeout: "15s" #(6)!
partitionRefreshInterval: "1m" #(7)!
threads: 1 #(8)!
shutdownWait: "30s" #(9)!
driverProperties: #(10)!
bootstrap.servers: "localhost:9093"
group.id: "my-group-id"
telemetry:
logging:
enabled: false #(11)!
metrics:
enabled: true #(12)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(12)!
telemetry:
enabled: true #(14)!
- Указываются топики на которые будет подписан Consumer (обязательный либо указывается
topicsPattern
) - Указываются паттерн топиков на которые будет подписан Consumer (обязательный либо указывается
topics
) - Обрабатывать ли пустые записи в случае если сигнатура принимает
ConsumerRecords
- Работает только если не указан
group.id
. Определяет стратегнию какую позицию в топике должен использовать Consumer. Допустимые значение:earliest
- самый ранний доступный offsetlatest
- последний доступный offset- Строка в формате
Duration
(например5m
) - сдвиг на определённое время назад
- Максиимальное время ожидания сообщений из топика в рамках одного вызова
- Максимальное время ожидания между неожиданными исключениями во время обработки
- Временной интервал в рамках которого требуется делать обновление партиций в случае
assign
метода - Количество потоков на которых будет запущен потребитель для параллельной обработки (если будет равен 0 то ни один потребитель не будет запущен вообще)
- Время ожидания обработки перед выключением потребителя в случае штатного завершения
- Properties из официального клиента кафки, документацию по ним можно посмотреть по ссылке (обязательный)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
Стратегия подключения¶
subscribe
стратегия подразумевает использование group.id,
чтобы объединить исполнителей в группы и они не дублировали вычитывание записей из своего очереди в рамках нескольких экземпляров приложений.
В случае же если надо чтобы каждый экземпляр приложения читал сообщения из топика одновременно с другими, предполагается использовать assign
стратегию,
для этого надо просто не указывать group.id
в конфигурации потребителя, но в такой стратегии можно указать одновременно лишь 1 топик.
Пример конфигурации assign
стратегии:
Десериализация¶
Deserializer
- используется для десериализации ключей и значений ConsumerRecord
.
Для более точной настройки Deserializer
поддерживаются теги.
Теги можно установить на параметре-ключе, параметре-значении, а так же на параметрах типа ConsumerRecord
и ConsumerRecords
.
Эти теги будут установлены на зависимостях контейнера.
@Component
final class SomeConsumer {
@KafkaListener("kafka.someConsumer1")
void process1(@Tag(Sometag1.class) String key, @Tag(Sometag2.class) String value) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
void process2(ConsumerRecord<@Tag(Sometag1.class) String, @Tag(Sometag2.class) String> record) {
// some handler code
}
}
@Component
class SomeConsumer {
@KafkaListener("kafka.someConsumer1")
fun process1(@Tag(Sometag1::class) key: String, @Tag(Sometag2::class) value: String) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
fun process2(record: ConsumerRecord<@Tag(Sometag1::class) String, @Tag(Sometag2::class) String>) {
// some handler code
}
}
В случае если требуется десериализация из Json
, то можно использовать тег @Json
:
@Component
final class SomeConsumer {
@Json
public record JsonEvent(String name, Integer code) {}
@KafkaListener("kafka.someConsumer1")
void process1(String key, @Json JsonEvent value) {
// some handler code
}
@KafkaListener("kafka.someConsumer2")
void process2(ConsumerRecord<String, @Json JsonEvent> record) {
// some handler code
}
}
Для обработчиков, не использующих ключ, по умолчанию используется Deserializer<byte[]>
т.к. он просто возвращает не обработанные байты.
Обработка исключений¶
Если метод помеченный @KafkaListener
выбросит исключение, то Consumer будет перезапущен,
потому что нет общего решения, как реагировать на это и разработчик должен сам решить как эту ситуацию обрабатывать.
Ошибки десериализации¶
Если вы используете сигнатуру с ConsumerRecord
или ConsumerRecords
, то вы получите исключение десериализации значения в момент вызова методов key
или value
.
В этот момент стоит его обработать нужным вам образом.
Выбрасываются следующие исключения:
ru.tinkoff.kora.kafka.common.exceptions.RecordKeyDeserializationException
ru.tinkoff.kora.kafka.common.exceptions.RecordValueDeserializationException
Из этих исключений можно получить сырой ConsumerRecord<byte[], byte[]>
.
Если вы используете сигнатуру с распакованными key
/value
/headers
, то можно добавить последним аргументом Exception
, Throwable
, RecordKeyDeserializationException
или RecordValueDeserializationException
.
Обратите внимание, что все аргументы становятся необязательными, то есть мы ожидаем что у нас либо будут ключ и значение, либо исключение.
Пользовательский тег¶
По умолчанию для потребителя создается автоматический тег по которому происходит внедрение, его можно посмотреть в созданном модуле на этапе компиляции.
Если по каким-то причинам вам требуется переопределить тег потребителя, можно задать его как аргумент аннотации @KafkaListener
:
События ребалансировки¶
Можно слушать и реагировать на события ребалансировки с помощью свой реализации интерфейса ConsumerAwareRebalanceListener
,
его следует предоставить как компонент по тегу потребителя:
@Tag(SomeListenerProcessTag.class)
@Component
public final class SomeListener implements ConsumerAwareRebalanceListener {
@Override
public void onPartitionsRevoked(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
@Override
public void onPartitionsAssigned(Consumer<?, ?> consumer, Collection<TopicPartition> partitions) {
}
}
@Tag(SomeListenerProcessTag::class)
@Component
class SomeListener : ConsumerAwareRebalanceListener {
override fun onPartitionsRevoked(consumer: Consumer<*, *>, partitions: Collection<TopicPartition>) {
}
override fun onPartitionsAssigned(consumer: Consumer<*, *>, partitions: Collection<TopicPartition>) {
}
}
Ручное управление¶
Kora предоставляет небольшую обёртку над KafkaConsumer
, позволяющую легко запустить обработку входящих событий.
Конструктор контейнера выглядит следующим образом:
public KafkaSubscribeConsumerContainer(KafkaListenerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
BaseKafkaRecordsHandler<K, V> handler) {
this.factory = new KafkaConsumerFactory<>(config);
this.handler = handler;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.config = config;
}
BaseKafkaRecordsHandler<K,V>
это базовый функциональный интерфейс обработчика:
@FunctionalInterface
public interface BaseKafkaRecordsHandler<K, V> {
void handle(ConsumerRecords<K, V> records, KafkaConsumer<K, V> consumer);
}
Сигнатуры¶
Доступные сигнатуры для методов Kafka потребителя из коробки, где под K
подразумевается тип ключа и под V
тип значения сообщения.
Позволяет принимать value
(обязательный), key
(опционально), Headers
(опционально) от ConsumerRecord
,
Exception
(опционально) в случае ошибки сериализации/соединения и после обработки всех событий вызывается commitSync()
:
Принимает ConsumerRecord
/ConsumerRecords
и KafkaConsumerRecordsTelemetryContext
/KafkaConsumerRecordTelemetryContext
(опционально) после обработки вызывается commitSync()
:
Принимает ConsumerRecord
/ConsumerRecords
и Consumer
.
Вызывается для каждого ConsumerRecord
полученного при вызове poll()
:
В данном случае commit
нужно вызывать вручную.
Продюсер¶
Описания работы с Kafka Producer
Предполагается использовать аннотацию @KafkaPublisher
на интерфейсе для создания Kafka Producer
,
для того чтобы отправлять сообщения в любой топик предполагается создание метода с сигнатурой ProducerRecord
:
Параметр аннотации указывает на путь до конфигурации.
Топик¶
В случае если требуется использовать типизированные контракты на определенные топики то предполагается использование аннотации @KafkaPublisher.Topic
для создания таких контрактов:
Параметр аннотации указывает на путь для конфигурации топика.
Конфигурация¶
Конфигурация описывает настройки конкретного @KafkaPublisher
и ниже указан пример для конфигурации по пути kafka.someConsumer
.
Пример полной конфигурации, описанной в классе KafkaPublisherConfig
(указаны примеры значений или значения по умолчанию):
kafka {
someProducer {
driverProperties { //(1)!
"bootstrap.servers": "localhost:9093"
}
telemetry {
logging {
enabled = false //(2)!
}
metrics {
enabled = true //(3)!
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(4)!
}
tracing {
enabled = true //(5)!
}
}
}
}
- Properties из официального клиента кафки, документацию по ним можно посмотреть по ссылке (обязательный)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
kafka:
someProducer:
driverProperties: #(1)!
bootstrap.servers: "localhost:9093"
telemetry:
logging:
enabled: true #(2)!
metrics:
enabled: true #(3)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(4)!
telemetry:
enabled: true #(5)!
- Properties из официального клиента кафки, документацию по ним можно посмотреть по ссылке (обязательный)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
Конфигурация топика описывает настройки конкретного @KafkaPublisher.Topic
и ниже указан пример для конфигурации по пути kafka.someProducer.someTopic
.
Пример полной конфигурации, описанной в классе KafkaPublisherConfig.TopicConfig
(указаны примеры значений или значения по умолчанию):
- В какой топик метод будет отправлять данные (обязательный)
- В какой partition топика метод будет отправлять данные (по умолчанию отсутвует)
Сериализация¶
Для уточнения какой Serializer
взять из контейнера есть возможность использовать теги.
Теги необходимо устанавливать на параметры ProducerRecord
или key
/value
методов:
В случае если хочется сериализовать как Json то следует использовать @Json
аннотацию:
Обработка исключений¶
В случае ошибки отправки методе проаннотированным @Topic
и который не возвращает Future<RecordMetadata>
будет выброшено ru.tinkoff.kora.kafka.common.exceptions.KafkaPublishException
где в cause
будет лежать реальная ошибка из KafkaProducer
.
Ошибки сериализации¶
В случае ошибки сериализации ключа/значения в методе проаннотированным @Topic
будет выброшено org.apache.kafka.common.errors.SerializationException
аналогично как это было бы в случае org.apache.kafka.clients.producer.Producer#send
Транзакции¶
Возможно отправлять сообщение в Kafka в рамках транзакции, для этого предполагается использовать
аннотацию @KafkaPublisher
для создания такого KafkaProducer
.
Требуется сначала создать обычного KafkaProducer
а затем его использовать для создания транзакционного Producer'а:
@KafkaPublisher("kafka.someProducer")
public interface MyPublisher {
@KafkaPublisher.Topic("kafka.someProducer.someTopic")
void send(String key, String value);
}
@KafkaPublisher("kafka.someTransactionalProducer")
public interface MyTransactionalPublisher extends TransactionalPublisher<MyPublisher> {
}
Предполагается использовать методы inTx
для отправки таких сообщений, все сообщения в рамках Lambda будут применены в случае успешного ее выполнения и отменены в случае ошибки.
Также возможно в ручную произвести все манипуляции с KafkaProducer
:
Конфигурация¶
KafkaPublisherConfig.TransactionConfig
используется для конфигурации @KafkaPublisher
с интерфейсом TransactionalPublisher
:
Сигнатуры¶
Доступные сигнатуры для методов Kafka продюсера из коробки, где под K
подразумевается тип ключа и под V
тип значения сообщения.
Позволяет отправлять value
и key
(опционально) и headers
(опционально) от ProducerRecord
:
Можно получать как результат операции RecordMetadata
либо Future<RecordMetadata>
:
@KafkaPublisher("kafka.someProducer")
public interface MyPublisher {
@KafkaPublisher.Topic("kafka.someProducer.someTopic")
RecordMetadata send(V value);
@KafkaPublisher.Topic("kafka.someProducer.someTopic")
Future<RecordMetadata> sendFuture(V value);
@KafkaPublisher.Topic("kafka.someProducer.someTopic")
CompletionStage<RecordMetadata> sendStage(V value);
}
Можно получать как результат операции RecordMetadata
и иметь модификатор suspend
:
Возможна отправка ProducerRecord
и Callback
(опционально) и комбинировать сигнатуры ответа: