R2DBC
Module provides a repository implementation based on R2DBC reactive database protocol, the implementation as an example is Postgres R2DBC.
Dependency¶
Dependency build.gradle
:
Module:
Dependency build.gradle.kts
:
Module:
Also required to provide the database driver implementation as a dependency.
Configuration¶
Example of the complete configuration described in the R2dbcDatabaseConfig
class (default or example values are specified):
db {
r2dbcUrl = "r2dbc:postgresql://localhost:5432/postgres" //(1)!
username = "postgres" //(2)!
password = "postgres" //(3)!
poolName = "kora" //(4)!
maxPoolSize = 10 //(5)!
minIdle = 0 //(6)!
acquireRetry = 3 //(7)!
connectionTimeout = "10s" //(8)!
connectionCreateTimeout = "30s" //(9)!
idleTimeout = "1m" //(10)!
maxLifetime = "0s" //(11)!
statementTimeout = "0s" //(12)!
readinessProbe = false //(13)!
options { //(14)!
"backgroundEvictionInterval": "PT120S"
}
telemetry {
logging {
enabled = false //(15)!
}
metrics {
enabled = true //(16)!
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] //(17)!
}
tracing {
enabled = true //(18)!
}
}
}
- R2DBC database connection URL (required)
- User name for connection (required)
- Password of the user to connect (required)
- Database Connection Set Name (required)
- Maximum size of the database connection set
- Minimum idle size of the ready database connection set
- Maximum number of attempts to obtain a connection
- Maximum time to establish a connection
- Maximum time to establish a connection
- Maximum time for connection downtime
- Maximum connection lifetime (optional)
- Maximum time to execute a query to the database (optional)
- Whether to enable probes.md#_2 for database connection
- Additional attributes of R2DBC connection (optional)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
db:
r2dbcUrl: "r2dbc:postgresql://localhost:5432/postgres" #(1)!
username: "postgres" #(2)!
password: "postgres" #(3)!
poolName: "kora" #(4)!
maxPoolSize: 10 #(5)!
minIdle: 0 #(6)!
acquireRetry: 3 #(7)!
connectionTimeout: "10s" #(8)!
connectionCreateTimeout: "30s" #(9)!
idleTimeout: "1m" #(10)!
maxLifetime: "0s" #(11)!
statementTimeout: "0ms" #(12)!
readinessProbe: false #(13)!
options: #(14)!
backgroundEvictionInterval: "PT120S"
telemetry:
logging:
enabled: false #(15)!
metrics:
enabled: true #(16)!
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ] #(17)!
tracing:
enabled: true #(18)!
- R2DBC database connection URL (required)
- User name for connection (required)
- Password of the user to connect (required)
- Database Connection Set Name (required)
- Maximum size of the database connection set
- Minimum idle size of the ready database connection set
- Maximum number of attempts to obtain a connection
- Maximum time to establish a connection
- Maximum time to establish a connection
- Maximum time for connection downtime
- Maximum connection lifetime (optional)
- Maximum time to execute a query to the database (optional)
- Whether to enable probes.md#_2 for database connection
- Additional attributes of R2DBC connection (optional)
- Enables module logging (default
false
) - Enables module metrics (default
true
) - Configures SLO for DistributionSummary metrics
- Enables module tracing (default
true
)
Usage¶
Mapping¶
It is possible to override the conversion of different parts of entity and query parameters, Kora provides special interfaces for this.
Result¶
If you need to convert the result manually, it is suggested to use R2dbcResultFluxMapper
:
final class ResultMapper implements R2dbcResultFluxMapper<UUID, Flux<UUID>> {
@Override
public Flux<UUID> apply(Flux<Result> resultFlux) {
// mapping code
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Mapping(ResultMapper.class)
@Query("SELECT id FROM entities")
Flux<UUID> getIds();
}
Row¶
If you need to convert the string manually, it is suggested to use R2dbcRowMapper
:
final class RowMapper implements R2dbcRowMapper<UUID> {
@Override
public UUID apply(Row row) {
return UUID.fromString(rs.get(0, String.class));
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Mapping(RowMapper.class)
@Query("SELECT id FROM entities")
Flux<UUID> findAll();
}
Column¶
If you need to convert the column value manually, it is suggested to use the R2dbcResultColumnMapper
:
public final class ColumnMapper implements R2dbcResultColumnMapper<UUID> {
@Override
public UUID apply(Row row, String label) {
return UUID.fromString(row.get(label, String.class));
}
}
@Table("entities")
public record Entity(@Mapping(ColumnMapper.class) @Id UUID id, String name) { }
@Repository
public interface EntityRepository extends R2dbcRepository {
@Query("SELECT id, name FROM entities")
Flux<Entity> findAll();
}
class ColumnMapper : R2dbcResultColumnMapper<UUID> {
override fun apply(row: Row, label: String): UUID {
return UUID.fromString(row.get(label, String.class))
}
}
@Table("entities")
data class Entity(
@Id @Mapping(ColumnMapper::class) val id: UUID,
val name: String
)
@Repository
interface EntityRepository : R2dbcRepository {
@Query("SELECT id, name FROM entities")
fun findAll(): Flux<Entity>
}
Parameter¶
If you want to convert the value of a query parameter manually, it is suggested to use R2dbcParameterColumnMapper
:
public final class ParameterMapper implements R2dbcParameterColumnMapper<UUID> {
@Override
public void set(Statement stmt, int index, @Nullable UUID value) {
if (value != null) {
stmt.bind(index, value.toString());
}
}
}
@Repository
public interface EntityRepository extends R2dbcRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
Flux<Entity> findById(@Mapping(ParameterMapper.class) UUID id);
}
class ParameterMapper : R2dbcParameterColumnMapper<UUID?> {
override fun set(stmt: Statement, index: Int, value: UUID?) {
if (value != null) {
stmt.bind(index, value.toString())
}
}
}
@Repository
interface EntityRepository : R2dbcRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
fun findById(@Mapping(ParameterMapper::class) id: UUID): Flux<Entity>
}
Supported types¶
List of supported types for arguments/return values out of the box
These types are chosen because they are supported by most popular databases.
- void
- boolean / Boolean
- short / Short
- int / Integer
- long / Long
- double / Double
- float / Float
- byte[]
- String
- BigInteger
- BigDecimal
- UUID
- LocalDate
- LocalTime
- LocalDateTime
- OffsetTime
- OffsetDateTime
Generated identifier¶
If you want to get the primary keys of an entity created by the database as the result,
it is suggested to use the @Id
annotation over a method where the return value type is identifiers.
This approach works for @Batch
queries as well.
Transactions¶
In order to perform manual queries in Kora, there is an interface ru.tinkoff.kora.database.r2dbc.R2dbcConnectionFactory
,
which is provided in a method within the R2dbcRepository
contract.
All repository methods called within a transaction lambda will be executed in that transaction.
In order to perform queries transactionally, the inTx
contract can be used:
@Component
public final class SomeService {
private final EntityRepository repository;
public SomeService(EntityRepository repository) {
this.repository = repository;
}
public Mono<List<Entity>> saveAll(Entity one, Entity two) {
return repository.getR2dbcConnectionFactory().inTx(connection -> {
// do some work
return repository.insert(one) //(1)!
.zipWith(repository.insert(two), //(2)!
(r1, r2) -> List.of(one, two));
});
}
}
- will be executed within the transaction or rolled back if the entire lambda throws an exception
- will be executed within the transaction or rolled back if the entire lambda throws an exception
@Component
class SomeService(private val repository: EntityRepository) {
fun saveAll(
one: Entity,
two: Entity
): Mono<List<Entity>> {
return repository.r2dbcConnectionFactory.inTx {
repository.insert(one).zipWith(repository.insert(two)) //(1)!
{ r1: UpdateCount, r2: UpdateCount -> listOf(one, two) }
}
}
}
- will be executed within the transaction or will be rolled back if the entire lambda throws an exception
Connection¶
If you need some more complex logic for the query and @Query
is not enough, you can use io.r2dbc.spi.Connection
:
@Component
public final class SomeService {
private final EntityRepository repository;
public SomeService(EntityRepository repository) {
this.repository = repository;
}
public Mono<List<Entity>> saveAll(Entity one, Entity two) {
return repository.getR2dbcConnectionFactory().inTx(connection -> {
// do some work
});
}
}
Signatures¶
Available signatures for repository methods out of the box:
The T
refers to the type of the return value, either List<T>
, either Void
or UpdateCount
.
T myMethod()
@Nullable T myMethod()
Optional<T> myMethod()
Mono<T> myMethod()
Project Reactor (require dependency)Flux<T> myMethod()
Project Reactor (require dependency)
By T
we mean the type of the return value, either T?
, either List<T>
, either Unit
or UpdateCount
.
myMethod(): T
suspend myMethod(): T
Kotlin Coroutine (require dependency asimplementation
)myMethod(): Flow<T>
Kotlin Coroutine (require dependency asimplementation
)