Kora фреймворк для написания Java / Kotlin приложений с упором на производительность, эффективность, прозрачность сделанный разработчиками Т-Банк / Тинькофф

Kora is a framework for writing Java / Kotlin applications with a focus on performance, efficiency, transparency made by T-Bank / Tinkoff developers

Перейти к содержанию

R2DBC

Модуль предоставляет реализацию репозиториев на основе R2DBC реактивного протокола работы с базами данных, реализацией как пример является Postgres R2DBC.

Подключение

Зависимость build.gradle:

annotationProcessor "ru.tinkoff.kora:annotation-processors"
implementation "ru.tinkoff.kora:database-r2dbc"

Модуль:

@KoraApp
public interface Application extends R2dbcDatabaseModule { }

Зависимость build.gradle.kts:

ksp("ru.tinkoff.kora:symbol-processors")
implementation("ru.tinkoff.kora:database-r2dbc")

Модуль:

@KoraApp
interface Application : R2dbcDatabaseModule

Также требуется предоставить реализацию драйвера базы данных как зависимость.

Конфигурация

Пример полной конфигурации, описанной в классе R2dbcDatabaseConfig (указаны примеры значений или значения по умолчанию):

db {
    r2dbcUrl = "r2dbc:postgresql://localhost:5432/postgres" //(1)!
    username = "postgres" //(2)!
    password = "postgres" //(3)!
    poolName = "kora" //(4)!
    maxPoolSize = 10 //(5)!
    minIdle = 0 //(6)!
    acquireRetry = 3 //(7)!
    connectionTimeout = "10s" //(8)!
    connectionCreateTimeout = "30s" //(9)!
    idleTimeout = "10m" //(10)!
    maxLifetime = "0s" //(11)!
    statementTimeout = "0s" //(12)!
    readinessProbe = false //(13)!
    options { //(14)!
        "backgroundEvictionInterval": "PT120S"
    }
    telemetry {
        logging {
            enabled = false //(15)!
        }
        metrics {
            enabled = true //(16)!
            slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(17)!
        }
        tracing {
            enabled = true //(18)!
        }
    }
}
  1. R2DBC URL подключения к базе данных (обязательный)
  2. Имя пользователя для подключения (обязательный)
  3. Пароль пользователя для подключения (обязательный)
  4. Имя набора соединений к базе данных (обязательный)
  5. Максимальный размер набора соединений к базе данных
  6. Минимальный размер набора готовых соединений к базе данных в режиме ожидания
  7. Максимальное количество попыток получения соединения
  8. Максимальное время на установку соединения
  9. Максимальное время на создание соединения
  10. Максимальное время на простой соединения
  11. Максимальное время жизни соединения (по умолчанию отсутвует)
  12. Максимальное время на выполнение запроса в базу данных (по умолчанию отсутвует)
  13. Включить ли пробу готовности для соединения базы данных
  14. Дополнительные атрибуты R2DBC соединения (по умолчанию отсутвует)
  15. Включает логгирование модуля (по умолчанию false)
  16. Включает метрики модуля (по умолчанию true)
  17. Настройка SLO для DistributionSummary метрики
  18. Включает трассировку модуля (по умолчанию true)
db:
  r2dbcUrl: "r2dbc:postgresql://localhost:5432/postgres" #(1)!
  username: "postgres" #(2)!
  password: "postgres" #(3)!
  poolName: "kora" #(4)!
  maxPoolSize: 10 #(5)!
  minIdle: 0 #(6)!
  acquireRetry: 3 #(7)!
  connectionTimeout: "10s" #(8)!
  connectionCreateTimeout: "30s" #(9)!
  idleTimeout: "10m" #(10)!
  maxLifetime: "0s" #(11)!
  statementTimeout: "0ms" #(12)!
  readinessProbe: false #(13)!
  options: #(14)!
    backgroundEvictionInterval: "PT120S"
  telemetry:
    logging:
      enabled: false #(15)!
    metrics:
      enabled: true #(16)!
      slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(17)!
    tracing:
      enabled: true #(18)!
  1. R2DBC URL подключения к базе данных (обязательный)
  2. Имя пользователя для подключения (обязательный)
  3. Пароль пользователя для подключения (обязательный)
  4. Имя набора соединений к базе данных (обязательный)
  5. Максимальный размер набора соединений к базе данных
  6. Минимальный размер набора готовых соединений к базе данных в режиме ожидания
  7. Максимальное количество попыток получения соединения
  8. Максимальное время на установку соединения
  9. Максимальное время на создание соединения
  10. Максимальное время на простой соединения
  11. Максимальное время жизни соединения (по умолчанию отсутвует)
  12. Максимальное время на выполнение запроса в базу данных (по умолчанию отсутвует)
  13. Включить ли пробу готовности для соединения базы данных
  14. Дополнительные атрибуты R2DBC соединения (по умолчанию отсутвует)
  15. Включает логгирование модуля (по умолчанию false)
  16. Включает метрики модуля (по умолчанию true)
  17. Настройка SLO для DistributionSummary метрики
  18. Включает трассировку модуля (по умолчанию true)

Использование

@Repository
public interface EntityRepository extends R2dbcRepository { }
@Repository
interface EntityRepository : R2dbcRepository

Конвертация

Возможно переопределять преобразование различных частей сущности и параметров запроса, для этого Kora предоставляет специальные интерфейсы.

Результат

Если требуется преобразовать результат в ручную, предлагается использовать R2dbcResultFluxMapper:

final class ResultMapper implements R2dbcResultFluxMapper<UUID, Flux<UUID>> {

    @Override
    public Flux<UUID> apply(Flux<Result> resultFlux) {
        // код преобразования
    }
}

@Repository
public interface EntityRepository extends R2dbcRepository {

    @Mapping(ResultMapper.class)
    @Query("SELECT id FROM entities")
    Flux<UUID> getIds();
}
class ResultMapper : R2dbcResultFluxMapper<UUID, Flux<UUID>> {
    override fun apply(resultFlux: Flux<Result>): Flux<UUID> {
        // код преобразования
    }
}

@Repository
interface EntityRepository : R2dbcRepository {

    @Mapping(ResultMapper::class)
    @Query("SELECT id FROM entities")
    fun getIds(): Flux<UUID>
}

Строка

Если требуется преобразовать строку в ручную, предлагается использовать R2dbcRowMapper:

final class RowMapper implements R2dbcRowMapper<UUID> {

    @Override
    public UUID apply(Row row) {
        return UUID.fromString(rs.get(0, String.class));
    }
}

@Repository
public interface EntityRepository extends R2dbcRepository {

    @Mapping(RowMapper.class)
    @Query("SELECT id FROM entities")
    Flux<UUID> findAll();
}
class RowMapper : R2dbcRowMapper<UUID> {

    override fun apply(row: Row): UUID {
        return UUID.fromString(rs.get(0, String.class))
    }
}

@Repository
interface EntityRepository : R2dbcRepository {

    @Mapping(RowMapper::class)
    @Query("SELECT id FROM entities")
    fun findAll(): Flux<UUID>
}

Колонка

Если требуется преобразовать значение колонки в ручную, предлагается использовать R2dbcResultColumnMapper:

public final class ColumnMapper implements R2dbcResultColumnMapper<UUID> {

    @Override
    public UUID apply(Row row, String label) {
        return UUID.fromString(row.get(label, String.class));
    }
}

@Table("entities")
public record Entity(@Mapping(ColumnMapper.class) @Id UUID id, String name) { }

@Repository
public interface EntityRepository extends R2dbcRepository {

    @Query("SELECT * FROM entities")
    Flux<Entity> findAll();
}
class ColumnMapper : R2dbcResultColumnMapper<UUID> {

    override fun apply(row: Row, label: String): UUID {
        return UUID.fromString(row.get(label, String.class))
    }
}

@Table("entities")
data class Entity(
    @Id @Mapping(ColumnMapper::class) val id: UUID,
    val name: String
)

@Repository
interface EntityRepository : R2dbcRepository {

    @Query("SELECT * FROM entities")
    fun findAll(): Flux<Entity>
}

Параметр

Если требуется преобразовать значение параметра запроса в ручную, предлагается использовать R2dbcParameterColumnMapper:

public final class ParameterMapper implements R2dbcParameterColumnMapper<UUID> {

    @Override
    public void set(Statement stmt, int index, @Nullable UUID value) {
        if (value != null) {
            stmt.bind(index, value.toString());
        }
    }
}

@Repository
public interface EntityRepository extends R2dbcRepository {

    @Query("SELECT * FROM entities WHERE id = :id")
    Flux<Entity> findById(@Mapping(ParameterMapper.class) UUID id);
}
class ParameterMapper : R2dbcParameterColumnMapper<UUID?> {

    override fun set(stmt: Statement, index: Int, value: UUID?) {
        if (value != null) {
            stmt.bind(index, value.toString())
        }
    }
}

@Repository
interface EntityRepository : R2dbcRepository {

    @Query("SELECT * FROM entities WHERE id = :id")
    fun findById(@Mapping(ParameterMapper::class) id: UUID): Flux<Entity>
}

Созданный идентификатор

Если необходимо получить в качестве результата созданные базой данных первичные ключи сущности, предлагается использовать аннотацию @Id над методом, где тип возвращаемого значения является идентификаторами. Такой подход работает и для @Batch запросов.

@Repository
public interface EntityRepository extends R2dbcRepository {

    public record Entity(Long id, String name) {}

    @Query("INSERT INTO entities(name) VALUES (:entity.name)")
    @Id
    Mono<Long> insert(Entity entity);
}
@Repository
interface EntityRepository : R2dbcRepository {

    public record Entity(Long id, String name) {}

    @Query("INSERT INTO entities(name) VALUES (:entity.name)")
    @Id
    fun insert(entity: Entity): Mono<Long>
}

Транзакции

Для выполнения ручных запросов в Kora есть интерфейс ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory, который предоставляется в методе в рамках контракта R2dbcRepository. Все методы репозитория вызванные в рамках лямбды транзакции будут выполнены в этой самой транзакции.

Для того чтобы выполнять запросы транзакционно, можно использовать контракт inTx:

@Component
public final class SomeService {

    private final EntityRepository repository;

    public SomeService(EntityRepository repository) {
        this.repository = repository;
    }

    public Mono<List<Entity>> saveAll(Entity one, Entity two) {
        return repository.getR2dbcConnectionFactory().inTx(connection -> {
            // do some work
            return repository.insert(one) //(1)!
                    .zipWith(repository.insert(two), //(2)!
                        (r1, r2) -> List.of(one, two));
        });
    }
}
  1. Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение
  2. Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение
@Component
class SomeService(private val repository: EntityRepository) {

    fun saveAll(
        one: Entity,
        two: Entity
    ): Mono<List<Entity>> {
        return repository.r2dbcConnectionFactory.inTx {
            repository.insert(one).zipWith(repository.insert(two)) //(1)!
            { r1: UpdateCount, r2: UpdateCount -> listOf(one, two) }
        }
    }
}
  1. Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение

Ручное управление

Если для запроса нужна какая-то более сложная логика, либо запросы вне репозитория, можно использовать io.r2dbc.spi.Connection:

@Component
public final class SomeService {

    private final EntityRepository repository;

    public SomeService(EntityRepository repository) {
        this.repository = repository;
    }

    public Mono<List<Entity>> saveAll(Entity one, Entity two) {
        return repository.getR2dbcConnectionFactory().inTx(connection -> {
            // do some work
        });
    }
}
@Component
class SomeService(private val repository: EntityRepository) {

    fun saveAll(
        one: Entity,
        two: Entity
    ): Mono<List<Entity>> {
        return repository.r2dbcConnectionFactory.inTx { connection ->
            // do some work
        }
    }
}

Сигнатуры

Под T подразумевается тип возвращаемого значения, либо Void, либо UpdateCount.

Доступные сигнатуры для методов репозитория из коробки:

Под T подразумевается тип возвращаемого значения, либо List<T>, либо Void, либо UpdateCount.

Под T подразумевается тип возвращаемого значения, либо T?, либо List<T>, либо Unit, либо UpdateCount.