R2DBC
Модуль предоставляет реализацию репозиториев на основе R2DBC реактивного протокола работы с базами данных, реализацией как пример является Postgres R2DBC.
Подключение¶
Зависимость build.gradle
:
Модуль:
Зависимость build.gradle.kts
:
Модуль:
Также требуется предоставить реализацию драйвера базы данных как зависимость.
Конфигурация¶
Пример полной конфигурации, описанной в классе R2dbcDatabaseConfig
(указаны примеры значений или значения по умолчанию):
db {
r2dbcUrl = "r2dbc:postgresql://localhost:5432/postgres" //(1)!
username = "postgres" //(2)!
password = "postgres" //(3)!
poolName = "kora" //(4)!
maxPoolSize = 10 //(5)!
minIdle = 0 //(6)!
acquireRetry = 3 //(7)!
connectionTimeout = "10s" //(8)!
connectionCreateTimeout = "30s" //(9)!
idleTimeout = "10m" //(10)!
maxLifetime = "0s" //(11)!
statementTimeout = "0s" //(12)!
readinessProbe = false //(13)!
options { //(14)!
"backgroundEvictionInterval": "PT120S"
}
telemetry {
logging {
enabled = false //(15)!
}
metrics {
enabled = true //(16)!
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(17)!
}
tracing {
enabled = true //(18)!
}
}
}
- R2DBC URL подключения к базе данных (обязательный)
- Имя пользователя для подключения (обязательный)
- Пароль пользователя для подключения (обязательный)
- Имя набора соединений к базе данных (обязательный)
- Максимальный размер набора соединений к базе данных
- Минимальный размер набора готовых соединений к базе данных в режиме ожидания
- Максимальное количество попыток получения соединения
- Максимальное время на установку соединения
- Максимальное время на создание соединения
- Максимальное время на простой соединения
- Максимальное время жизни соединения (по умолчанию отсутвует)
- Максимальное время на выполнение запроса в базу данных (по умолчанию отсутвует)
- Включить ли пробу готовности для соединения базы данных
- Дополнительные атрибуты R2DBC соединения (по умолчанию отсутвует)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
db:
r2dbcUrl: "r2dbc:postgresql://localhost:5432/postgres" #(1)!
username: "postgres" #(2)!
password: "postgres" #(3)!
poolName: "kora" #(4)!
maxPoolSize: 10 #(5)!
minIdle: 0 #(6)!
acquireRetry: 3 #(7)!
connectionTimeout: "10s" #(8)!
connectionCreateTimeout: "30s" #(9)!
idleTimeout: "10m" #(10)!
maxLifetime: "0s" #(11)!
statementTimeout: "0ms" #(12)!
readinessProbe: false #(13)!
options: #(14)!
backgroundEvictionInterval: "PT120S"
telemetry:
logging:
enabled: false #(15)!
metrics:
enabled: true #(16)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(17)!
tracing:
enabled: true #(18)!
- R2DBC URL подключения к базе данных (обязательный)
- Имя пользователя для подключения (обязательный)
- Пароль пользователя для подключения (обязательный)
- Имя набора соединений к базе данных (обязательный)
- Максимальный размер набора соединений к базе данных
- Минимальный размер набора готовых соединений к базе данных в режиме ожидания
- Максимальное количество попыток получения соединения
- Максимальное время на установку соединения
- Максимальное время на создание соединения
- Максимальное время на простой соединения
- Максимальное время жизни соединения (по умолчанию отсутвует)
- Максимальное время на выполнение запроса в базу данных (по умолчанию отсутвует)
- Включить ли пробу готовности для соединения базы данных
- Дополнительные атрибуты R2DBC соединения (по умолчанию отсутвует)
- Включает логгирование модуля (по умолчанию
false
) - Включает метрики модуля (по умолчанию
true
) - Настройка SLO для DistributionSummary метрики
- Включает трассировку модуля (по умолчанию
true
)
Использование¶
Конвертация¶
Возможно переопределять преобразование различных частей сущности и параметров запроса, для этого Kora предоставляет специальные интерфейсы.
Результат¶
Если требуется преобразовать результат в ручную, предлагается использовать R2dbcResultFluxMapper
:
final class ResultMapper implements R2dbcResultFluxMapper<UUID, Flux<UUID>> {
@Override
public Flux<UUID> apply(Flux<Result> resultFlux) {
// код преобразования
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Mapping(ResultMapper.class)
@Query("SELECT id FROM entities")
Flux<UUID> getIds();
}
Строка¶
Если требуется преобразовать строку в ручную, предлагается использовать R2dbcRowMapper
:
final class RowMapper implements R2dbcRowMapper<UUID> {
@Override
public UUID apply(Row row) {
return UUID.fromString(rs.get(0, String.class));
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Mapping(RowMapper.class)
@Query("SELECT id FROM entities")
Flux<UUID> findAll();
}
Колонка¶
Если требуется преобразовать значение колонки в ручную, предлагается использовать R2dbcResultColumnMapper
:
public final class ColumnMapper implements R2dbcResultColumnMapper<UUID> {
@Override
public UUID apply(Row row, String label) {
return UUID.fromString(row.get(label, String.class));
}
}
@Table("entities")
public record Entity(@Mapping(ColumnMapper.class) @Id UUID id, String name) { }
@Repository
public interface EntityRepository extends R2dbcRepository {
@Query("SELECT id, name FROM entities")
Flux<Entity> findAll();
}
class ColumnMapper : R2dbcResultColumnMapper<UUID> {
override fun apply(row: Row, label: String): UUID {
return UUID.fromString(row.get(label, String.class))
}
}
@Table("entities")
data class Entity(
@Id @Mapping(ColumnMapper::class) val id: UUID,
val name: String
)
@Repository
interface EntityRepository : R2dbcRepository {
@Query("SELECT id, name FROM entities")
fun findAll(): Flux<Entity>
}
Параметр¶
Если требуется преобразовать значение параметра запроса в ручную, предлагается использовать R2dbcParameterColumnMapper
:
public final class ParameterMapper implements R2dbcParameterColumnMapper<UUID> {
@Override
public void set(Statement stmt, int index, @Nullable UUID value) {
if (value != null) {
stmt.bind(index, value.toString());
}
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
Flux<Entity> findById(@Mapping(ParameterMapper.class) UUID id);
}
class ParameterMapper : R2dbcParameterColumnMapper<UUID?> {
override fun set(stmt: Statement, index: Int, value: UUID?) {
if (value != null) {
stmt.bind(index, value.toString())
}
}
}
@Repository
interface EntityRepository : R2dbcRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
fun findById(@Mapping(ParameterMapper::class) id: UUID): Flux<Entity>
}
Поддерживаемые типы¶
Список поддерживаемых типов для аргументов/возвращаемых значений из коробки
- void
- boolean / Boolean
- short / Short
- int / Integer
- long / Long
- double / Double
- float / Float
- byte[]
- String
- BigInteger
- BigDecimal
- UUID
- LocalDate
- LocalTime
- LocalDateTime
- OffsetTime
- OffsetDateTime
Созданный идентификатор¶
Если необходимо получить в качестве результата созданные базой данных первичные ключи сущности,
предлагается использовать аннотацию @Id
над методом, где тип возвращаемого значения является идентификаторами.
Такой подход работает и для @Batch
запросов.
Транзакции¶
Для выполнения ручных запросов в Kora есть интерфейс ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
,
который предоставляется в методе в рамках контракта R2dbcRepository
.
Все методы репозитория вызванные в рамках лямбды транзакции будут выполнены в этой самой транзакции.
Для того чтобы выполнять запросы транзакционно, можно использовать контракт inTx
:
@Component
public final class SomeService {
private final EntityRepository repository;
public SomeService(EntityRepository repository) {
this.repository = repository;
}
public Mono<List<Entity>> saveAll(Entity one, Entity two) {
return repository.getR2dbcConnectionFactory().inTx(connection -> {
// do some work
return repository.insert(one) //(1)!
.zipWith(repository.insert(two), //(2)!
(r1, r2) -> List.of(one, two));
});
}
}
- Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение
- Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение
@Component
class SomeService(private val repository: EntityRepository) {
fun saveAll(
one: Entity,
two: Entity
): Mono<List<Entity>> {
return repository.r2dbcConnectionFactory.inTx {
repository.insert(one).zipWith(repository.insert(two)) //(1)!
{ r1: UpdateCount, r2: UpdateCount -> listOf(one, two) }
}
}
}
- Будет выполнено в рамках транзакции либо откатится если вся лямбра выкинет исключение
Ручное управление¶
Если для запроса нужна какая-то более сложная логика, либо запросы вне репозитория, можно использовать io.r2dbc.spi.Connection
:
@Component
public final class SomeService {
private final EntityRepository repository;
public SomeService(EntityRepository repository) {
this.repository = repository;
}
public Mono<List<Entity>> saveAll(Entity one, Entity two) {
return repository.getR2dbcConnectionFactory().inTx(connection -> {
// do some work
});
}
}
Сигнатуры¶
Под T
подразумевается тип возвращаемого значения, либо Void
, либо UpdateCount
.
Доступные сигнатуры для методов репозитория из коробки:
Под T
подразумевается тип возвращаемого значения, либо List<T>
, либо Void
, либо UpdateCount
.
T myMethod()
@Nullable T myMethod()
Optional<T> myMethod()
Mono<T> myMethod()
Project Reactor (надо подключить зависимость)Flux<T> myMethod()
Project Reactor (надо подключить зависимость)
Под T
подразумевается тип возвращаемого значения, либо T?
, либо List<T>
, либо Unit
, либо UpdateCount
.
myMethod(): T
suspend myMethod(): T
Kotlin Coroutine (надо подключить зависимость какimplementation
)myMethod(): Flow<T>
Kotlin Coroutine (надо подключить зависимость какimplementation
)