Cassandra
Модуль предоставляет реализацию репозиториев для базы данных Cassandra с использованием драйвера DataStax.
Подключение¶
Зависимость build.gradle
:
Модуль:
Зависимость build.gradle.kts
:
Модуль:
Конфигурация¶
Пример простой конфигурация, описанной в классе CassandraConfig
(указаны примеры значений):
cassandra {
basic {
contactPoints = "127.0.0.1:9042, 127.0.0.2:9042" //(1)!
dc = "datacenter1" //(2)!
sessionKeyspace = "test-db" //(3)!
request {
timeout = "5s" //(4)!
}
}
auth {
login = "username" //(5)!
password = "password" //(6)!
}
}
- Адреса нод Cassandra для подключения к базе данных (обязательный)
- Имя датацентра Cassandra (по умолчанию отсутвует)
- Имя keyspace для подключения (по умолчанию отсутвует)
- Ограничение время выполнения запросов в рамках подключения (по умолчанию отсутвует)
- Имя пользователя для подключения (по умолчанию отсутвует)
- Пароль пользователя для подключения (по умолчанию отсутвует)
cassandra:
basic:
contactPoints: "127.0.0.1:9042, 127.0.0.2:9042" #(1)!
dc: "datacenter1" #(2)!
sessionKeyspace: "test-db" #(3)!
request:
timeout: "5s" #(4)!
auth:
login: "username" #(5)!
password: "password" #(6)!
- Адреса нод Cassandra для подключения к базе данных (обязательный)
- Имя датацентра Cassandra (по умолчанию отсутвует)
- Имя keyspace для подключения (по умолчанию отсутвует)
- Ограничение время выполнения запросов в рамках подключения (по умолчанию отсутвует)
- Имя пользователя для подключения (по умолчанию отсутвует)
- Пароль пользователя для подключения (по умолчанию отсутвует)
Пример полной конфигурации
Пример полной конфигурации с примерами значений которые могут быть описаны (конфигурация описана в классе CassandraConfig
):
cassandra {
auth {
login = "username"
password = "password"
}
basic {
contactPoints = [ "127.0.0.1:9042", "127.0.0.2:9042" ] // хосты нод кассандры
sessionName = "some-session-name" // имя сессии
dc = "datacenter1" // Имя датацентра
sessionKeyspace = "test-db" // Название keyspace для этой сессии
loadBalancingPolicy.slowReplicaAvoidance = true // Флаг включения механизма избегания медленных реплик
cloud.secureConnectBundle = "/location/of/secure/connect/bundle" // Расположения бандла для подключения к Datastax Apache Cassandra. Путь должен быть валидным URL'ом. По умолчанию, если не указан протокол, будет считаться что это file://
request { // Настройки запросов
timeout = "5s" // таймаут запроса
consistency = "LOCAL_ONE" // уровень консистентности, допустимые значения: ANY, ONE, TWO, THREE, QUORUM, ALL, LOCAL_QUORUM, EACH_QUORUM, SERIAL, LOCAL_SERIAL, LOCAL_ONE
pageSize = 5000 // Ограничение размера страницы (определяет, сколько строк может быть возвращено за один запрос)
serialConsistency = "LOCAL_SERIAL" // Уровень консистентности для легковесных транзакций(LWT). Допустимые значения SERIAL и LOCAL_SERIAL.
defaultIdempotence = false // Настройки значения идемпотентности для запросов
}
}
advanced { // Расширенные настройки
sessionLeak.threshold = 4 // Максимальное количество активных сессий
connection {
connectTimeout = "10s" // Таймаут подключения
initQueryTimeout = "10s" // Таймаут инициализации запроса
setKeyspaceTimeout = "10s" // Таймаут установки keyspace
maxRequestsPerConnection = 1024 // Ограничение запросов на одно подключение
maxOrphanRequests = 256 // Максимальное количество "осиротевших" запросов, т.е. тех, ответ на которые по тем или иным причинам прекратили ожидать.
warnOnInitError = true // Выводить ошибки при инициализации в лог
pool { // Настройки пула.
localSize = 10
remoteSize = 10
}
}
reconnectOnInit = false // Повторять попытку инициализации, если при первой попытке все ноды, указанные в contactpoints, не ответили
reconnectionPolicy { // Политика переподключения - базовая и максимальная задержка. По умолчанию, при неудачно попытке используется первое значение, затем при каждой следующей - удваивается, пока не достигнет максимального значения
baseDelay = "1s"
maxDelay = "60s"
}
sslEngineFactory {
cipherSuites = [ "TLS_RSA_WITH_AES_128_CBC_SHA", "TLS_RSA_WITH_AES_256_CBC_SHA" ]
hostnameValidation = true // Валидация имени хоста
keystorePath = "/path/to/client.keystore" // Путь к хранилищу ключей
keystorePassword = "password" // Пароль от хранилища ключей
truststorePath = "/path/to/client.truststore" // Путь к доверенному хранилищу
truststorePassword = "password" // Пароль от доверенного хранилища
}
timestampGenerator { // Генератор, добавляющий timestamp к каждому запросу. По умолчанию используется AtomicTimestampGenerator
forceJavaClock = false // Принудительно использовать Java system clock
driftWarning.threshold = "1s" // Указывает, насколько далеко в будущее могут "убегать" таймстэмпы при высокой нагрузке
driftWarning.interval = "10s" // Интервал логирования предупреждений, есди таймстэмпы продолжают "убегать" вперёд.
}
protocol {
version = "V4" // Версия протокола Cassandra
compression = "lz4" // Сжатие
maxFrameLength = 268435456 // Максимальная длина фрейма в байтах
}
request {
warnIfSetKeyspace = true // Логировать предупреждение о том, что в запросе выполняется установка keyspace
trace { // Настройки встроенного механизма трейсинга запросов
attempts = 5 // Количество попыток
interval = "1ms" // Интервал между попытками
consistency = "ONE" // Уровень консистентности
}
logWarnings = true
}
metrics { // session-level метрики, по умолчанию выключены все
node.enabled = [] // Список включенных метрик. Включаемые: bytes-sent, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.errors, continuous-cql-requests
session.enabled = []
publishPercentileHistogram = false // публиковать ли персентили в метриках в рамках мин/макс вместе с SLO
node.cqlMessages { // Дополнительные настройки для метрик, если нужны:
lowestLatency = "1ms"
highestLatency = "90s"
significantDigits = 1
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
}
session.cqlRequests {
lowestLatency = "1ms"
highestLatency = "90s"
significantDigits = 1
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
}
session.throttlingDelay {
lowestLatency = "1ms"
highestLatency = "90s"
significantDigits = 1
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
}
}
socket {
tcpNoDelay = true // Флаг для отключения Nagle алгоритма, по умолчанию true(выключен), т.к. драйвер имеет собственный message coalescing algorithm
keepAlive = false
reuseAddress = true // Позволять переиспользовать адрес
lingerInterval = 0
receiveBufferSize = 65535
sendBufferSize = 65535
}
heartbeat {
interval = "30s"
timeout = "2m"
}
metadata { // Настройки, отвечающие за schema metadata
schema {
enabled = true
requestTimeout = "20s"
requestPageSize = 20
refreshedKeyspaces = [ "ks1", "ks2" ]
debouncer.window = "1s" // Время, которое драйвер ждёт перед применением обновления
debouncer.maxEvents = 20 // Максимальное количество обновлений, которое может быть накоплено
}
topologyEventDebouncer.window = "1s" // Окно для отправки события.
topologyEventDebouncer.maxEvents = 20 // Максимальное количество событий в пачке
tokenMapEnabled = true
}
controlConnection {
timeout = "10s"
schemaAgreement {
interval = 200ms
timeout = "10s"
warnOnFailure = true
}
}
preparedStatements {
prepareOnAllNodes = true // Выполнять подготовку запроса на всех нодах после её успешного выполнения на одной ноде.
reprepareOnUp {
enabled = true // Подготавливать запросы для новых нод
checkSystemTable = false // Проверять наличие prepare statement в system.prepared_statements ноды перед подготовкой
maxStatements = 0 // Максимальной количество запросов, которые можно переподготовить
maxParallelism = 100 // Максимальное количество конкурентных запросов
timeout = 20s
}
preparedCache.weakValues = false
}
netty { // Настройки Netty event loop, используемой в драйвере
ioGroup.size = 0 // Количество тредов
ioGroup.shutdown { // Настройки штатного завершения
quietPeriod = 2
timeout = 15
unit = "SECONDS"
}
adminGroup.size = 2 // Event loop группа, используемая только для админских задач, не связанных с IO
adminGroup.shutdown {
quietPeriod = 2
timeout = 15
unit = "SECONDS"
}
timer.tickDuration = "100ms" // Настройки того, как часто таймер должен пробуждаться для проверки просроченных задач
timer.ticksPerWheel = 2048
daemon = false
}
coalescer.rescheduleInterval = "10ms"
resolveContactPoints = false
}
profiles { // Настройки, переопределяемые в профиле
someProfile {
basic {
// basic.request.timeout
// basic.request.consistency
}
advanced {
// advanced.request.trace.consistency
// advanced.request.trace.attempts
}
}
}
telemetry {
logging {
enabled = false
}
metrics {
enabled = true
slo = [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
}
tracing {
enabled = true
}
}
}
cassandra:
advanced: # Расширенные настройки
coalescer:
rescheduleInterval: "10ms"
connection:
connectTimeout: "10s" # Таймаут подключения
initQueryTimeout: "10s" # Таймаут инициализации запроса
setKeyspaceTimeout: "10s" # Таймаут установки keyspace
maxOrphanRequests: 256 # Максимальное количество "осиротевших" запросов, т.е. тех, ответ на которые по тем или иным причинам прекратили ожидать.
maxRequestsPerConnection: 1024 # Ограничение запросов на одно подключение
pool: # Настройки пула.
localSize: 10
remoteSize: 10
warnOnInitError: true # Выводить ошибки при инициализации в лог
controlConnection:
schemaAgreement:
interval: "200ms"
timeout: "10s"
warnOnFailure: true
timeout: "10s"
heartbeat:
interval: "30s"
timeout: "2m"
metadata: # Настройки, отвечающие за schema metadata
schema:
debouncer:
maxEvents: 20 # Максимальное количество обновлений, которое может быть накоплено
window: "1s" # Время, которое драйвер ждёт перед применением обновления
enabled: true
refreshedKeyspaces:
- ks1
- ks2
requestPageSize: 10
requestTimeout: "20s"
tokenMapEnabled: true
topologyEventDebouncer:
maxEvents: 20 # Максимальное количество событий в пачке
window: "1s" # Окно для отправки события.
metrics:
publishPercentileHistogram: false # публиковать ли персентили в метриках в рамках мин/макс вместе с SLO
node:
enabled: [] # Список включенных метрик. Включаемые: bytes-sent, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.errors, continuous-cql-requests
cqlMessages: # Дополнительные настройки для метрик, если нужны:
lowestLatency: "1ms"
highestLatency: "90s"
significantDigits: 1
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
session: # session-level метрики, по умолчанию выключены все
enabled: [] # Список включенных метрик. Включаемые: bytes-sent, connected-nodes, cql-requests, cql-client-timeouts, cql-prepared-cache-size, throttling.delay, throttling.errors, continuous-cql-requests
cqlRequests:
lowestLatency: "1ms"
highestLatency: "90s"
significantDigits: 1
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
throttlingDelay:
lowestLatency: "1ms"
highestLatency: "90s"
significantDigits: 1
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
netty: # Настройки Netty event loop, используемой в драйвере
adminGroup: # Event loop группа, используемая только для админских задач, не связанных с IO
shutdown:
quietPeriod: 2
timeout: 15
unit: SECONDS
size: 2
daemon: false
ioGroup:
shutdown: # Настройки штатного завершения
quietPeriod: 2
timeout: 15
unit: SECONDS
size: 0 # Количество тредов
timer:
tickDuration: "100ms" # Настройки того, как часто таймер должен пробуждаться для проверки просроченных задач
ticksPerWheel: 2048
preparedStatements:
prepareOnAllNodes: true # Выполнять подготовку запроса на всех нодах после её успешного выполнения на одной ноде.
preparedCache:
weakValues: false
reprepareOnUp:
enabled: true # Подготавливать запросы для новых нод
checkSystemTable: false # Проверять наличие prepare statement в system.prepared_statements ноды перед подготовкой
maxParallelism: 100 # Максимальное количество конкурентных запросов
maxStatements: 0 # Максимальной количество запросов, которые можно переподготовить
timeout: "20s"
protocol:
compression: "lz4" # Сжатие
maxFrameLength: 268435456 # Максимальная длина фрейма в байтах
version: "V4" # Версия протокола Cassandra
reconnectOnInit: false # Повторять попытку инициализации, если при первой попытке все ноды, указанные в contactpoints, не ответили
reconnectionPolicy: # Политика переподключения - базовая и максимальная задержка. По умолчанию, при неудачно попытке используется первое значение, затем при каждой следующей - удваивается, пока не достигнет максимального значения
baseDelay: "1s"
maxDelay: "60s"
request:
logWarnings: true
trace:
attempts: 5 # Количество попыток
consistency: ONE # Уровень консистентности
interval: "1ms" # Интервал между попытками
warnIfSetKeyspace: true # Логировать предупреждение о том, что в запросе выполняется установка keyspace
resolveContactPoints: false
sessionLeak:
threshold: 4
socket:
keepAlive: false
lingerInterval: 0
receiveBufferSize: 65535
reuseAddress: true # Позволять переиспользовать адрес
sendBufferSize: 65535
tcpNoDelay: true # Флаг для отключения Nagle алгоритма, по умолчанию true(выключен), т.к. драйвер имеет собственный message coalescing algorithm
sslEngineFactory:
cipherSuites:
- TLS_RSA_WITH_AES_128_CBC_SHA
- TLS_RSA_WITH_AES_256_CBC_SHA
hostnameValidation: true # Валидация имени хоста
keystorePassword: "password" # Пароль от хранилища ключей
keystorePath: "/path/to/client.keystore" # Путь к хранилищу ключей
truststorePassword: "password" # Пароль от доверенного хранилища
truststorePath: "/path/to/client.truststore" # Путь к доверенному хранилищу
timestampGenerator: # Генератор, добавляющий timestamp к каждому запросу. По умолчанию используется AtomicTimestampGenerator
driftWarning:
interval: "10s" # Интервал логирования предупреждений, есди таймстэмпы продолжают "убегать" вперёд.
threshold: "1s" # Указывает, насколько далеко в будущее могут "убегать" таймстэмпы при высокой нагрузке
forceJavaClock: false # Принудительно использовать Java system clock
auth:
login: "username"
password: "password"
basic:
cloud:
secureConnectBundle: "/location/of/secure/connect/bundle"
contactPoints:
- "127.0.0.1:9042"
- "127.0.0.2:9042"
dc: "datacenter1"
loadBalancingPolicy:
slowReplicaAvoidance: true
request:
consistency: LOCAL_ONE
defaultIdempotence: false
pageSize: 5000
serialConsistency: LOCAL_SERIAL
timeout: "5s"
sessionKeyspace: "test-db"
sessionName: "some-session-name"
profiles: # Настройки, переопределяемые в профиле
someProfile:
advanced:
#advanced.request.trace.consistency
#advanced.request.trace.attempts
basic:
#basic.request.timeout
#basic.request.consistency
telemetry:
logging:
enabled: false
metrics:
enabled: true
slo: [ 1, 10, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 30000, 60000, 90000 ]
tracing:
enabled: true
Использование¶
Профиль¶
Можно переопределять общие настройки, частными настройками из профиля, предположим есть такая конфигурация профиля someProfile
:
Применить настройки из профиля someProfile
, достаточно сделать следующее:
Настройки, указанные в профиле, будут применяться к каждому запросу, конкретно в этом случае — будет установлен таймаут в 10с.
Конвертация¶
Возможно переопределять преобразование различных частей сущности и параметров запроса, для этого Kora предоставляет специальные интерфейсы.
Результат¶
Если требуется преобразовать результат в ручную, предлагается использовать CassandraResultSetMapper
:
final class ResultMapper implements CassandraResultSetMapper<UUID> {
@Override
public UUID apply(ResultSet rows) {
// код преобразования
}
}
@Repository
public interface EntityRepository extends CassandraRepository {
@Mapping(ResultMapper.class)
@Query("SELECT id FROM entities")
List<UUID> getIds();
}
Строка¶
Если требуется преобразовать строку в ручную, предлагается использовать CassandraRowMapper
:
final class RowMapper implements CassandraRowMapper<UUID> {
@Override
public UUID apply(Row row) {
return UUID.fromString(rs.getString(0));
}
}
@Repository
public interface EntityRepository extends CassandraRepository {
@Mapping(RowMapper.class)
@Query("SELECT id FROM entities")
List<UUID> findAll();
}
Колонка¶
Если требуется преобразовать значение колонки в ручную, предлагается использовать CassandraRowColumnMapper
:
public final class ColumnMapper implements CassandraRowColumnMapper<UUID> {
@Override
public UUID apply(GettableByName row, int index) {
return UUID.fromString(row.getString(index));
}
}
@Table("entities")
public record Entity(@Mapping(ColumnMapper.class) @Id UUID id, String name) { }
@Repository
public interface EntityRepository extends CassandraRepository {
@Query("SELECT id, name FROM entities")
List<Entity> findAll();
}
class ColumnMapper : CassandraRowColumnMapper<UUID> {
override fun apply(row: GettableByName, index: Int): UUID {
return UUID.fromString(row.getString(index))
}
}
@Table("entities")
data class Entity(
@Id @Mapping(ColumnMapper::class) val id: UUID,
val name: String
)
@Repository
interface EntityRepository : CassandraRepository {
@Query("SELECT id, name FROM entities")
fun findAll(): List<Entity>
}
Параметр¶
Если требуется преобразовать значение параметра запроса в ручную, предлагается использовать CassandraParameterColumnMapper
:
public final class ParameterMapper implements CassandraParameterColumnMapper<UUID> {
@Override
public void set(SettableByName<?> stmt, int index, @Nullable UUID value) {
if (value != null) {
stmt.setString(index, value.toString());
}
}
}
@Repository
public interface EntityRepository extends CassandraRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
List<Entity> findById(@Mapping(ParameterMapper.class) UUID id);
}
class ParameterMapper : CassandraParameterColumnMapper<UUID?> {
override fun set(stmt: SettableByName<*>, index: Int, value: UUID?) {
if (value != null) {
stmt.setString(index, value.toString())
}
}
}
@Repository
interface EntityRepository : CassandraRepository {
@Query("SELECT id, name FROM entities WHERE id = :id")
fun findById(@Mapping(ParameterMapper::class) id: UUID): List<Entity>
}
Асинхронно¶
Из-за особенностей вспомогательного класса для извлечения данных из AsyncResultSet
для асинхронных запросов (Mono или Suspend), можно использовать только CassandraReactiveResultSetMapper
:
final class AsyncResultMapper implements CassandraReactiveResultSetMapper<UUID, Flux<UUID>> {
@Override
public UUID apply(ResultSet rows) {
return Flux.from(rows).map(r -> UUID.fromString(r.getString(0)));
}
}
@Repository
public interface EntityRepository extends CassandraRepository {
@Mapping(AsyncResultMapper.class)
@Query("SELECT id FROM entities")
Flux<UUID> getIds();
}
class AsyncResultMapper : CassandraReactiveResultSetMapper<UUID, Flux<UUID>> {
override fun apply(rows: ReactiveResultSet): Flux<UUID> {
return Flux.from(rows).map { r -> UUID.fromString(r.getString(0)) }
}
}
@Repository
interface EntityRepository : CassandraRepository {
@Mapping(AsyncResultMapper::class)
@Query("SELECT id FROM entities")
fun getIds(): Flux<UUID>
}
UDT¶
Есть поддержка UDT
типов с помощью @UDT
аннотации:
Сигнатуры¶
Доступные сигнатуры для методов репозитория из коробки:
Под T
подразумевается тип возвращаемого значения, либо List<T>
, либо Void
.
T myMethod()
@Nullable T myMethod()
Optional<T> myMethod()
CompletionStage<T> myMethod()
CompletionStage (надо предоставитьExecutor
)Mono<T> myMethod()
Project Reactor (надо подключить зависимость)Flux<T> myMethod()
Project Reactor (надо подключить зависимость)
Под T
подразумевается тип возвращаемого значения, либо T?
, либо List<T>
, либо Unit
.
myMethod(): T
suspend myMethod(): T
Kotlin Coroutine (надо подключить зависимость какimplementation
)myMethod(): Flow<T>
Kotlin Coroutine (надо подключить зависимость какimplementation
)