Продвинутый gRPC-клиент с Kora¶
В этом руководстве рассматриваются продвинутые приемы построения gRPC-клиента в Kora. Вы разберете серверные потоки, клиентские потоки, двунаправленные потоки, авторизацию на основе метаданных и клиентские перехватчики для сгенерированных заглушек. Вы также увидите, как асинхронные наблюдатели, сигналы завершения и ошибки жизненного цикла потока отличают потоковые клиенты от обычного кода в стиле «запрос-ответ».
Если в процессе захочется сверить результат, используйте готовое рабочее приложение: Kora Java gRPC Client Advanced App.
Если в процессе захочется сверить результат, используйте готовое рабочее приложение: Kora Kotlin gRPC Client Advanced App.
Что вы создадите¶
Вы расширите приложение gRPC-клиента и добавите:
- потоковый клиентский сервис для
UserStreamingService - серверные потоковые вызовы для
GetAllUsers - клиентские потоковые вызовы для
CreateUsers - двунаправленные потоковые вызовы для
UpdateUsers - HTTP-маршруты запуска, которые позволяют удобно выполнять каждый потоковый сценарий локально
- клиентский перехватчик журналирования
- клиентский перехватчик авторизации, который передает ключ API через метаданные gRPC
- быстрые внутрипроцессные тесты, которые проверяют потоковое поведение без сервера, запущенного вручную
Что вам понадобится¶
- JDK 17 или новее
- Gradle 7+
- текстовый редактор или среда разработки
- запущенный продвинутый gRPC-сервер для проверок во время выполнения
Требования¶
Обязательно: пройдите руководство по продвинутому gRPC-серверу
Это руководство предполагает, что вы уже прошли gRPC-клиент с Kora и Продвинутый gRPC-сервер с Kora, а также понимаете сгенерированные унарные заглушки, контракты protobuf и то, как Kora внедряет зависимости gRPC-клиента.
Если вы еще не прошли руководство по продвинутому gRPC-серверу, сначала сделайте это, потому что здесь используется тот же потоковый контракт, а основное внимание уделяется потоковым вызовам на стороне клиента.
Обзор¶
Как и руководство по продвинутому серверу, руководство по продвинутому клиенту построено вокруг разделения ответственности.
Мы не перегружаем исходный унарный клиент всеми продвинутыми возможностями.
Вместо этого:
- базовый клиент остается сосредоточен на унарном CRUD через
UserService - продвинутый клиент сосредоточен на потоках через
UserStreamingService
Такой подход делает оба руководства проще для изучения и соответствует устройству сопутствующих приложений.
На стороне клиента продвинутые возможности gRPC сильнее влияют на поток управления, чем на внедрение зависимостей. Kora по-прежнему предоставляет настроенный gRPC-клиент и компоненты приложения. Сгенерированные заглушки по-прежнему выполняют RPC-вызовы. Меняется то, как код сервиса управляет временем жизни вызовов, потоками запросов, наблюдателями ответов, метаданными и ошибками.
В этом руководстве эти задачи остаются явными:
- потоковые сервисы оборачивают сгенерированные асинхронные заглушки, а не раскрывают их напрямую
- клиентские перехватчики добавляют сквозное поведение к исходящим вызовам
- авторизация через метаданные настраивается рядом с границей клиента
- HTTP-точки запуска являются только локальным способом проверить потоковый клиент
У продвинутого клиента также другая модель отказов по сравнению с унарным клиентом. В унарном вызове отказ обычно означает, что один запрос завершился ошибкой до получения одного ответа. В потоковом вызове ошибка может произойти после того, как часть сообщений уже была отправлена или получена. Поэтому сервис-обертка должна рассматривать завершение потока как часть проектирования API, а не как деталь, о которой вспоминают в конце.
Именно поэтому руководству вводит явные клиентские методы для каждой формы потоковой передачи:
- серверный поток ориентирован на чтение: один вызов, много полученных ответов
- клиентский поток ориентирован на запись: много отправленных запросов, один итоговый ответ
- двунаправленный поток похож на диалог: отправка и получение происходят независимо
Сгенерированная асинхронная заглушка дает большие возможности, но обычно это не та граница, которую стоит распространять по остальной части приложения. Она раскрывает механику обратных вызовов: наблюдателей и сигналы завершения. Сервисная обертка Kora превращает эту механику в более компактный API, который можно вызывать из контроллеров, заданий или других сервисов, не разнося gRPC-код с обратными вызовами по всему приложению.
Метаданные и перехватчики относятся к той же границе. Они полезны для авторизации, трассировки, идентификаторов запросов и журналирования, но подключать их нужно рядом со сгенерированным клиентом. Так бизнес-код остается сосредоточен на выполняемой операции, а не на том, как каждый RPC-вызов оформляется при передаче по сети.
Как потоки меняют клиент¶
Унарные gRPC-вызовы выглядят приятно просто:
- создать один запрос
- вызвать один метод
- получить один ответ
Потоковая передача меняет эту модель мышления.
Серверный поток¶
При серверном потоке клиент отправляет один запрос и получает много ответов.
Значит, клиентский код должен учитывать:
- перебор потока сообщений
- частичный прогресс
- момент завершения потока
Клиентский поток меняет создание данных¶
При клиентском потоке клиент больше не отправляет один полностью готовый объект запроса.
Вместо этого он постепенно отправляет несколько сообщений в вызов и только позже получает итоговый ответ.
Двунаправленный поток¶
При двунаправленном потоке клиент и сервер могут продолжать обмениваться сообщениями в рамках одного RPC.
Значит, клиент должен обрабатывать:
- асинхронную отправку запросов
- асинхронную обработку ответов
- жизненный цикл потока и его завершение
API в Protobuf¶
Продвинутый клиент повторно использует ровно тот же ориентированный на потоки .proto-контракт из руководства по продвинутому серверу.
Protobuf-контракт
syntax = "proto3";
package ru.tinkoff.kora.guide.grpcserver.advanced;
option java_multiple_files = true;
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";
service UserService {
rpc CreateUser(CreateUserRequest) returns (UserResponse) {}
rpc GetUser(GetUserRequest) returns (UserResponse) {}
rpc GetUsers(GetUsersRequest) returns (GetUsersResponse) {}
rpc UpdateUser(UpdateUserRequestUnary) returns (UserResponse) {}
rpc DeleteUser(DeleteUserRequest) returns (google.protobuf.Empty) {}
}
service UserStreamingService {
rpc GetAllUsers(google.protobuf.Empty) returns (stream UserResponse) {}
rpc CreateUsers(stream CreateUserRequest) returns (CreateUsersResponse) {}
rpc UpdateUsers(stream UpdateUserRequest) returns (stream UserResponse) {}
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message GetUserRequest {
string user_id = 1;
}
message GetUsersRequest {
int32 page = 1;
int32 size = 2;
string sort = 3;
}
message GetUsersResponse {
repeated UserResponse users = 1;
}
message UpdateUserRequestUnary {
string user_id = 1;
string name = 2;
string email = 3;
}
message DeleteUserRequest {
string user_id = 1;
}
message UpdateUserRequest {
string user_id = 1;
string name = 2;
string email = 3;
}
message CreateUsersResponse {
int32 created_count = 1;
repeated string user_ids = 2;
}
message UserResponse {
string id = 1;
string name = 2;
string email = 3;
google.protobuf.Timestamp created_at = 4;
}
Снова обратите внимание на важное решение в моделировании:
- унарный
UserServiceвсе еще присутствует в контракте - работа продвинутого клиента сосредоточена на отдельном
UserStreamingService
Зависимости¶
Модуль продвинутого клиента использует тот же основной клиентский набор, что и базовый клиент.
Обновите build.gradle:
plugins {
id "application"
id "com.google.protobuf" version "0.9.4"
}
dependencies {
compileOnly "javax.annotation:javax.annotation-api:1.3.2"
annotationProcessor "ru.tinkoff.kora:annotation-processors"
implementation "ru.tinkoff.kora:config-hocon"
implementation "ru.tinkoff.kora:grpc-client"
implementation "ru.tinkoff.kora:http-server-undertow"
implementation "ru.tinkoff.kora:json-module"
implementation "ru.tinkoff.kora:logging-logback"
implementation "io.grpc:grpc-protobuf:1.74.0"
testRuntimeOnly platform("org.junit:junit-bom:$junitVersion")
testRuntimeOnly "org.junit.platform:junit-platform-launcher"
testImplementation platform("org.junit:junit-bom:$junitVersion")
testImplementation "io.grpc:grpc-inprocess:1.74.0"
testImplementation "org.junit.jupiter:junit-jupiter"
}
Обновите build.gradle.kts:
import com.google.protobuf.gradle.id
plugins {
id("org.jetbrains.kotlin.jvm")
id("com.google.devtools.ksp")
id("application")
id("com.google.protobuf") version "0.9.4"
}
dependencies {
compileOnly("javax.annotation:javax.annotation-api:1.3.2")
ksp("ru.tinkoff.kora:symbol-processors")
implementation("ru.tinkoff.kora:config-hocon")
implementation("ru.tinkoff.kora:grpc-client")
implementation("ru.tinkoff.kora:http-server-undertow")
implementation("ru.tinkoff.kora:json-module")
implementation("ru.tinkoff.kora:logging-logback")
implementation("io.grpc:grpc-protobuf:1.74.0")
testRuntimeOnly(platform("org.junit:junit-bom:${property("junitVersion")}"))
testRuntimeOnly("org.junit.platform:junit-platform-launcher")
testImplementation(platform("org.junit:junit-bom:${property("junitVersion")}"))
testImplementation("io.grpc:grpc-inprocess:1.74.0")
testImplementation("org.junit.jupiter:junit-jupiter")
}
Генерация кода¶
Настройка Gradle для protobuf остается по смыслу такой же, как в руководстве по базовому клиенту:
Добавьте в build.gradle:
protobuf {
protoc { artifact = "com.google.protobuf:protoc:3.25.3" }
plugins {
grpc { artifact = "io.grpc:protoc-gen-grpc-java:1.74.0" }
}
generateProtoTasks {
all()*.plugins { grpc {} }
}
}
sourceSets {
main {
java {
srcDirs "build/generated/source/proto/main/grpc"
srcDirs "build/generated/source/proto/main/java"
}
}
}
Добавьте в build.gradle.kts:
protobuf {
protoc { artifact = "com.google.protobuf:protoc:3.25.3" }
plugins {
id("grpc") { artifact = "io.grpc:protoc-gen-grpc-java:1.74.0" }
}
generateProtoTasks {
all().forEach { task ->
task.plugins { id("grpc") }
}
}
}
sourceSets {
main {
java {
srcDirs("build/generated/source/proto/main/grpc", "build/generated/source/proto/main/java")
}
}
}
Меняется не сама генерация кода, а типы сгенерированных заглушек, которые мы используем:
- блокирующие заглушки для чтения серверных потоков
- асинхронные заглушки для клиентских и двунаправленных потоков
Конфигурация¶
Продвинутый сервер защищает потоковый сервис с помощью ключа API в метаданных gRPC, поэтому продвинутый клиент должен знать две вещи:
- где находится сервер
- какой ключ API нужно отправлять
Полное описание настроек смотрите в разделах HTTP-сервер, Конфигурация, gRPC-клиент и Журналирование SLF4J.
httpServer {
publicApiHttpPort = 8081 //(1)!
privateApiHttpPort = 8086 //(2)!
telemetry.logging.enabled = true //(3)!
}
auth.apiKey.value = "test-api-key" //(4)!
auth.apiKey.value = ${?GRPC_STREAMING_API_KEY} //(5)!
grpcClient {
UserStreamingService {
url = "http://localhost:8092" //(6)!
url = ${?GRPC_STREAMING_SERVER_URL} //(7)!
telemetry.logging.enabled = true //(8)!
}
}
logging {
levels {
"ROOT": "INFO" //(9)!
"ru.tinkoff.kora": "INFO" //(10)!
"ru.tinkoff.kora.guide.grpcclient.advanced": "INFO" //(11)!
}
}
- Общедоступный HTTP-порт по умолчанию, который используют конечные точки приложения.
- Служебный HTTP-порт по умолчанию для проб, метрик и конечных точек управления.
- Включает возможность для этого раздела конфигурации.
- Настроенное значение, которое использует компонент руководства.
- Настроенное значение, которое использует компонент руководства. Необязательное переопределение через
GRPC_STREAMING_API_KEY. - Базовый URL, который использует настроенный клиент.
- Базовый URL, который использует настроенный клиент. Необязательное переопределение через
GRPC_STREAMING_SERVER_URL. - Включает возможность для этого раздела конфигурации.
- Уровень журналирования для
ROOT. - Уровень журналирования для
ru.tinkoff.kora. - Уровень журналирования для
ru.tinkoff.kora.guide.grpcclient.advanced.
httpServer:
publicApiHttpPort: 8081 #(1)!
privateApiHttpPort: 8086 #(2)!
telemetry:
logging:
enabled: true #(3)!
auth:
apiKey:
value: ${?GRPC_STREAMING_API_KEY:"test-api-key"} #(4)!
grpcClient:
UserStreamingService:
url: ${?GRPC_STREAMING_SERVER_URL:"http://localhost:8092"} #(5)!
telemetry:
logging:
enabled: true #(6)!
logging:
levels:
ROOT: "INFO" #(7)!
"ru.tinkoff.kora": "INFO" #(8)!
"ru.tinkoff.kora.guide.grpcclient.advanced": "INFO" #(9)!
- Общедоступный HTTP-порт по умолчанию, который используют конечные точки приложения.
- Служебный HTTP-порт по умолчанию для проб, метрик и конечных точек управления.
- Включает возможность для этого раздела конфигурации.
- Настроенное значение, которое использует компонент руководства. Использует показанное значение по умолчанию и позволяет
GRPC_STREAMING_API_KEYпереопределить его. - Базовый URL, который использует настроенный клиент. Использует показанное значение по умолчанию и позволяет
GRPC_STREAMING_SERVER_URLпереопределить его. - Включает возможность для этого раздела конфигурации.
- Уровень журналирования для
ROOT. - Уровень журналирования для
ru.tinkoff.kora. - Уровень журналирования для
ru.tinkoff.kora.guide.grpcclient.advanced.
Как и в руководстве по базовому клиенту, локальный URL использует http://..., чтобы gRPC-клиент работал в режиме без шифрования для этой демонстрационной настройки.
Клиентский перехватчик¶
Подробнее о клиентских gRPC-перехватчиках и metadata смотрите в разделе gRPC Client: перехватчики.
Клиентские перехватчики являются клиентским аналогом промежуточного слоя транспорта. Они полезны для задач, которые должны выполняться для исходящих вызовов в одном месте:
- журналирование
- авторизация
- предельные сроки
- трассировка
Продвинутый клиент добавляет простой перехватчик журналирования:
package ru.tinkoff.kora.guide.grpcclient.advanced.grpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.MethodDescriptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import ru.tinkoff.kora.common.Component;
import ru.tinkoff.kora.common.Tag;
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc;
@Tag(UserStreamingServiceGrpc.class)
@Component
public final class LoggingInterceptor implements ClientInterceptor {
private static final Logger logger = LoggerFactory.getLogger(LoggingInterceptor.class);
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
logger.info("Calling gRPC method {}", method.getFullMethodName());
return next.newCall(method, callOptions);
}
}
package ru.tinkoff.kora.guide.grpcclient.advanced.grpc
import io.grpc.*
import org.slf4j.LoggerFactory
import ru.tinkoff.kora.common.Component
import ru.tinkoff.kora.common.Tag
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc
@Tag(UserStreamingServiceGrpc::class)
@Component
class LoggingInterceptor : ClientInterceptor {
private val logger = LoggerFactory.getLogger(LoggingInterceptor::class.java)
override fun <ReqT : Any?, RespT : Any?> interceptCall(
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> {
logger.info("Calling gRPC method {}", method.fullMethodName)
return next.newCall(method, callOptions)
}
}
@Tag(UserStreamingServiceGrpc.class) важен: он ограничивает действие этого перехватчика клиентом продвинутого потокового сервиса.
Перехватчик авторизации¶
Теперь сделаем так, чтобы клиент автоматически отправлял ключ API, который ожидает продвинутый сервер.
package ru.tinkoff.kora.guide.grpcclient.advanced.grpc;
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingClientCall;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import ru.tinkoff.kora.common.Component;
import ru.tinkoff.kora.common.Tag;
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc;
@Tag(UserStreamingServiceGrpc.class)
@Component
public final class UserStreamingAuthInterceptor implements ClientInterceptor {
private static final Metadata.Key<String> AUTHORIZATION_HEADER =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER);
private final UserStreamingAuthConfig authConfig;
public UserStreamingAuthInterceptor(UserStreamingAuthConfig authConfig) {
this.authConfig = authConfig;
}
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next) {
return new ForwardingClientCall.SimpleForwardingClientCall<>(next.newCall(method, callOptions)) {
@Override
public void start(Listener<RespT> responseListener, Metadata headers) {
headers.put(AUTHORIZATION_HEADER, authConfig.value());
super.start(responseListener, headers);
}
};
}
}
package ru.tinkoff.kora.guide.grpcclient.advanced.grpc
import io.grpc.*
import ru.tinkoff.kora.common.Component
import ru.tinkoff.kora.common.Tag
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc
@Tag(UserStreamingServiceGrpc::class)
@Component
class UserStreamingAuthInterceptor(
private val authConfig: UserStreamingAuthConfig
) : ClientInterceptor {
override fun <ReqT : Any?, RespT : Any?> interceptCall(
method: MethodDescriptor<ReqT, RespT>,
callOptions: CallOptions,
next: Channel
): ClientCall<ReqT, RespT> {
return object :
ForwardingClientCall.SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
override fun start(responseListener: Listener<RespT>, headers: Metadata) {
headers.put(AUTHORIZATION_HEADER, authConfig.value())
super.start(responseListener, headers)
}
}
}
companion object {
private val AUTHORIZATION_HEADER: Metadata.Key<String> =
Metadata.Key.of("authorization", Metadata.ASCII_STRING_MARSHALLER)
}
}
Это gRPC-аналог автоматического добавления заголовков авторизации в продвинутом HTTP-клиенте.
Потоковый клиентский¶
Теперь можно обернуть сгенерированные потоковые заглушки в небольшой клиентский сервис.
package ru.tinkoff.kora.guide.grpcclient.advanced.service;
import com.google.protobuf.Empty;
import io.grpc.stub.StreamObserver;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import ru.tinkoff.kora.common.Component;
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserRequest;
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserResponse;
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserUpdateRequest;
import ru.tinkoff.kora.guide.grpcserver.advanced.CreateUserRequest;
import ru.tinkoff.kora.guide.grpcserver.advanced.CreateUsersResponse;
import ru.tinkoff.kora.guide.grpcserver.advanced.UpdateUserRequest;
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc;
@Component
public final class UserStreamingClientService {
private final UserStreamingServiceGrpc.UserStreamingServiceBlockingStub blockingStub;
private final UserStreamingServiceGrpc.UserStreamingServiceStub asyncStub;
public UserStreamingClientService(
UserStreamingServiceGrpc.UserStreamingServiceBlockingStub blockingStub,
UserStreamingServiceGrpc.UserStreamingServiceStub asyncStub) {
this.blockingStub = blockingStub;
this.asyncStub = asyncStub;
}
public CreateUsersResult createUsers(List<UserRequest> requests) {
var future = new CompletableFuture<CreateUsersResult>();
var responseObserver = new StreamObserver<CreateUsersResponse>() {
@Override
public void onNext(CreateUsersResponse value) {
future.complete(new CreateUsersResult(value.getCreatedCount(), List.copyOf(value.getUserIdsList())));
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
}
};
var requestObserver = this.asyncStub.createUsers(responseObserver);
try {
for (var request : requests) {
requestObserver.onNext(CreateUserRequest.newBuilder()
.setName(request.name())
.setEmail(request.email())
.build());
}
requestObserver.onCompleted();
return future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
requestObserver.onError(e);
throw new RuntimeException("Failed to create users over gRPC streaming", e);
}
}
public List<UserResponse> getAllUsers() {
var users = new ArrayList<UserResponse>();
var iterator = this.blockingStub.getAllUsers(Empty.getDefaultInstance());
iterator.forEachRemaining(user -> users.add(toDto(user)));
return users;
}
public List<UserResponse> updateUsers(List<UserUpdateRequest> updates) {
var future = new CompletableFuture<List<UserResponse>>();
var responses = new CopyOnWriteArrayList<UserResponse>();
var responseObserver = new StreamObserver<ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse>() {
@Override
public void onNext(ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse value) {
responses.add(toDto(value));
}
@Override
public void onError(Throwable t) {
future.completeExceptionally(t);
}
@Override
public void onCompleted() {
future.complete(List.copyOf(responses));
}
};
var requestObserver = this.asyncStub.updateUsers(responseObserver);
try {
for (var update : updates) {
requestObserver.onNext(UpdateUserRequest.newBuilder()
.setUserId(update.userId())
.setName(update.name())
.setEmail(update.email())
.build());
}
requestObserver.onCompleted();
return future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
requestObserver.onError(e);
throw new RuntimeException("Failed to update users over gRPC streaming", e);
}
}
private UserResponse toDto(ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse response) {
return new UserResponse(
response.getId(),
response.getName(),
response.getEmail(),
LocalDateTime.ofEpochSecond(
response.getCreatedAt().getSeconds(),
response.getCreatedAt().getNanos(),
ZoneOffset.UTC));
}
public record CreateUsersResult(int createdCount, List<String> userIds) {}
}
package ru.tinkoff.kora.guide.grpcclient.advanced.service
import com.google.protobuf.Empty
import io.grpc.stub.StreamObserver
import ru.tinkoff.kora.common.Component
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserRequest
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserResponse
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserUpdateRequest
import ru.tinkoff.kora.guide.grpcserver.advanced.CreateUserRequest
import ru.tinkoff.kora.guide.grpcserver.advanced.CreateUsersResponse
import ru.tinkoff.kora.guide.grpcserver.advanced.UpdateUserRequest
import ru.tinkoff.kora.guide.grpcserver.advanced.UserStreamingServiceGrpc
import java.time.LocalDateTime
import java.time.ZoneOffset
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.TimeUnit
@Component
class UserStreamingClientService(
private val blockingStub: UserStreamingServiceGrpc.UserStreamingServiceBlockingStub,
private val asyncStub: UserStreamingServiceGrpc.UserStreamingServiceStub
) {
fun createUsers(requests: List<UserRequest>): CreateUsersResult {
val future = CompletableFuture<CreateUsersResult>()
val responseObserver = object : StreamObserver<CreateUsersResponse> {
override fun onNext(value: CreateUsersResponse) {
future.complete(CreateUsersResult(value.createdCount, value.userIdsList.toList()))
}
override fun onError(t: Throwable) {
future.completeExceptionally(t)
}
override fun onCompleted() = Unit
}
val requestObserver = asyncStub.createUsers(responseObserver)
try {
requests.forEach { request ->
requestObserver.onNext(
CreateUserRequest.newBuilder()
.setName(request.name)
.setEmail(request.email)
.build()
)
}
requestObserver.onCompleted()
return future.get(5, TimeUnit.SECONDS)
} catch (e: Exception) {
requestObserver.onError(e)
throw RuntimeException("Failed to create users over gRPC streaming", e)
}
}
fun getAllUsers(): List<UserResponse> {
val users = mutableListOf<UserResponse>()
val iterator = blockingStub.getAllUsers(Empty.getDefaultInstance())
iterator.forEachRemaining { user -> users += toDto(user) }
return users
}
fun updateUsers(updates: List<UserUpdateRequest>): List<UserResponse> {
val future = CompletableFuture<List<UserResponse>>()
val responses = CopyOnWriteArrayList<UserResponse>()
val responseObserver = object : StreamObserver<ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse> {
override fun onNext(value: ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse) {
responses += toDto(value)
}
override fun onError(t: Throwable) {
future.completeExceptionally(t)
}
override fun onCompleted() {
future.complete(responses.toList())
}
}
val requestObserver = asyncStub.updateUsers(responseObserver)
try {
updates.forEach { update ->
requestObserver.onNext(
UpdateUserRequest.newBuilder()
.setUserId(update.userId)
.setName(update.name)
.setEmail(update.email)
.build()
)
}
requestObserver.onCompleted()
return future.get(5, TimeUnit.SECONDS)
} catch (e: Exception) {
requestObserver.onError(e)
throw RuntimeException("Failed to update users over gRPC streaming", e)
}
}
private fun toDto(response: ru.tinkoff.kora.guide.grpcserver.advanced.UserResponse): UserResponse {
return UserResponse(
response.id,
response.name,
response.email,
LocalDateTime.ofEpochSecond(response.createdAt.seconds, response.createdAt.nanos, ZoneOffset.UTC)
)
}
data class CreateUsersResult(val createdCount: Int, val userIds: List<String>)
}
Этот один сервис теперь показывает все три формы потоковой передачи со стороны клиента.
Контроллер проверки¶
Сопутствующее приложение содержит небольшую вспомогательную конечную точку, которая запускает потоковый клиент.
package ru.tinkoff.kora.guide.grpcclient.advanced.controller;
import java.util.List;
import ru.tinkoff.kora.common.Component;
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserRequest;
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserUpdateRequest;
import ru.tinkoff.kora.guide.grpcclient.advanced.service.UserStreamingClientService;
import ru.tinkoff.kora.http.common.HttpMethod;
import ru.tinkoff.kora.http.common.annotation.HttpRoute;
import ru.tinkoff.kora.http.server.common.annotation.HttpController;
import ru.tinkoff.kora.json.common.annotation.Json;
@Component
@HttpController
public final class ClientTestController {
private final UserStreamingClientService userStreamingClientService;
public ClientTestController(UserStreamingClientService userStreamingClientService) {
this.userStreamingClientService = userStreamingClientService;
}
@HttpRoute(method = HttpMethod.POST, path = "/client/test-all-streaming-endpoints")
@Json
public TestResults testAllStreamingEndpoints() {
try {
var created = this.userStreamingClientService.createUsers(List.of(
new UserRequest("Alice Streaming", "alice-streaming@example.com"),
new UserRequest("Bob Streaming", "bob-streaming@example.com")));
boolean usersCreated = created.createdCount() == 2;
var streamed = this.userStreamingClientService.getAllUsers();
boolean usersStreamed = created.userIds().stream()
.allMatch(userId -> streamed.stream().anyMatch(user -> user.id().equals(userId)));
var updated = this.userStreamingClientService.updateUsers(List.of(
new UserUpdateRequest(created.userIds().get(0), "Updated Alice Streaming", "updated-alice@example.com"),
new UserUpdateRequest(created.userIds().get(1), "Updated Bob Streaming", "updated-bob@example.com")));
boolean usersUpdated = updated.stream().anyMatch(user -> "Updated Alice Streaming".equals(user.name()))
&& updated.stream().anyMatch(user -> "Updated Bob Streaming".equals(user.name()));
boolean allTestsPassed = usersCreated && usersStreamed && usersUpdated;
return new TestResults(usersCreated, usersStreamed, usersUpdated, allTestsPassed, null);
} catch (Exception exception) {
return new TestResults(false, false, false, false, exception.getMessage());
}
}
@Json
public record TestResults(
boolean usersCreated,
boolean usersStreamed,
boolean usersUpdated,
boolean allTestsPassed,
String error) {}
}
package ru.tinkoff.kora.guide.grpcclient.advanced.controller
import ru.tinkoff.kora.common.Component
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserRequest
import ru.tinkoff.kora.guide.grpcclient.advanced.dto.UserUpdateRequest
import ru.tinkoff.kora.guide.grpcclient.advanced.service.UserStreamingClientService
import ru.tinkoff.kora.http.common.HttpMethod
import ru.tinkoff.kora.http.common.annotation.HttpRoute
import ru.tinkoff.kora.http.server.common.annotation.HttpController
import ru.tinkoff.kora.json.common.annotation.Json
@Component
@HttpController
class ClientTestController(
private val userStreamingClientService: UserStreamingClientService
) {
@HttpRoute(method = HttpMethod.POST, path = "/client/test-all-streaming-endpoints")
@Json
fun testAllStreamingEndpoints(): TestResults {
return try {
val created = userStreamingClientService.createUsers(
listOf(
UserRequest("Alice Streaming", "alice-streaming@example.com"),
UserRequest("Bob Streaming", "bob-streaming@example.com")
)
)
val usersCreated = created.createdCount == 2
val streamed = userStreamingClientService.getAllUsers()
val usersStreamed = created.userIds.all { userId -> streamed.any { user -> user.id == userId } }
val updated = userStreamingClientService.updateUsers(
listOf(
UserUpdateRequest(created.userIds[0], "Updated Alice Streaming", "updated-alice@example.com"),
UserUpdateRequest(created.userIds[1], "Updated Bob Streaming", "updated-bob@example.com")
)
)
val usersUpdated = updated.any { it.name == "Updated Alice Streaming" } &&
updated.any { it.name == "Updated Bob Streaming" }
val allTestsPassed = usersCreated && usersStreamed && usersUpdated
TestResults(usersCreated, usersStreamed, usersUpdated, allTestsPassed, null)
} catch (exception: Exception) {
TestResults(false, false, false, false, exception.message)
}
}
@Json
data class TestResults(
val usersCreated: Boolean,
val usersStreamed: Boolean,
val usersUpdated: Boolean,
val allTestsPassed: Boolean,
val error: String?
)
}
И снова HTTP-конечная точка является только локальной обвязкой. Настоящая тема руководства находится ниже нее: потоковый gRPC-клиент.
Запуск приложения¶
Сначала запустите продвинутый сервер:
Затем запустите продвинутый клиент:
Теперь выполните вызов:
Эта одна вспомогательная конечная точка внутри проверяет:
- клиентский поток
- серверный поток
- двунаправленный поток
Запуск тестов¶
Тесты продвинутого клиента также используют подход с внутрипроцессным gRPC.
Здесь это особенно полезно, потому что позволяет тестам имитировать:
- успешные потоковые взаимодействия
- отклоненные вызовы без ключа API
- поведение перехватчиков
Запустите их командой:
Лучшие практики¶
- Держите продвинутую потоковую работу в отдельном клиентском сервисе, а не раздувайте унарный клиент.
- Ограничивайте область действия перехватчиков тегами, когда они должны влиять только на один сгенерированный сервис.
- Используйте клиентские перехватчики для авторизации на основе метаданных, а не повторяйте логику заголовков в каждом месте вызова.
- Держите обработку жизненного цикла потока рядом с транспортной границей.
- Предпочитайте внутрипроцессные gRPC-серверы для быстрых тестов потоков на стороне клиента.
- Аннотируйте рукописные DTO с помощью
@Jsonтолько тогда, когда они пересекают HTTP/JSON-границу; сгенерированным protobuf-сообщениям JSON-аннотации не нужны.
Итоги¶
В этом руководстве вы создали потоковый gRPC-клиент, который отражает руководство по продвинутому серверу.
Важная идея состояла не только в том, «как вызывать потоковые RPC», но и в том, «как чисто организовать клиент»:
- разделить унарные и потоковые задачи
- добавить авторизацию и журналирование через перехватчики
- обернуть сгенерированные заглушки в сфокусированный сервисный слой
Ключевые понятия¶
- чем продвинутые gRPC-клиенты отличаются от унарных gRPC-клиентов
- как блокирующие и асинхронные заглушки используются для разных потоковых шаблонов
- как клиентские перехватчики добавляют журналирование и авторизацию через метаданные
- как потреблять серверные, клиентские и двунаправленные потоковые методы
- как тестировать продвинутое поведение gRPC-клиента с
InProcessServer
Устранение неполадок¶
Потоковый вызов никогда не завершается:
Проверьте, что поток запросов завершается на стороне клиента, а тестовая или серверная реализация отправляет сигналы завершения.
Вызовы отклоняются как не прошедшие авторизацию:
Проверьте, что клиент и сервер используют одно и то же значение ключа API, а перехватчик авторизации помечен тегом для сгенерированного потокового клиента.
Внутрипроцессные тесты проходят, а вызовы во время выполнения завершаются ошибкой:
Сравните рабочий application.conf с настройкой внутрипроцессного теста, особенно узел gRPC, порт и теги перехватчиков.
Что дальше?¶
- Шаблоны отказоустойчивости, чтобы защищать потоковые и унарные RPC-вызовы от медленных или недоступных сервисов.
- Наблюдаемость, чтобы трассировать вызовы gRPC-клиента, жизненные циклы потоков и поведение перехватчиков.
- Обмен сообщениями с Kafka, чтобы сравнить интеграцию в стиле RPC с асинхронной событийной интеграцией.
- Продвинутый HTTP-клиент, чтобы сравнить границы продвинутого gRPC-клиента и продвинутого HTTP-клиента.
Помощь¶
Если что-то не работает:
- сравните с Kora Java gRPC Client Advanced App и Kora Kotlin gRPC Client Advanced App
- проверьте документацию по gRPC-клиенту
- проверьте, что продвинутый сервер из Продвинутого gRPC-сервера запущен на порту
8092 - убедитесь, что клиент и сервер используют одно и то же значение ключа API