从零开始的 Java gRPC

深入 Java gRPC 实现:构建高性能微服务通信

让我们一起探索在 Java 环境中如何有效地利用 gRPC 技术。

gRPC(Google Remote Procedure Call): 这是一个由 Google 主导开发的开源远程过程调用框架,主要用于构建微服务之间的高速数据交换通道。 gRPC 的优势在于它能让使用不同编程语言开发的服务无缝协作。 其核心在于 Protobuf(Protocol Buffers)消息传递格式,这是一种高效且紧凑的数据序列化方式,专门用于结构化数据的传输。

在某些特定的应用场景中,gRPC API 展现出比传统 REST API 更卓越的性能。

接下来,我们将尝试构建一个 gRPC 服务器。 首先,我们需要定义 .proto 文件,这些文件将描述我们的服务接口和数据模型 (DTO)。 为了简化演示,我们将创建 `ProfileService` 和 `ProfileDescriptor` 这两个组件。

`ProfileService` 的定义如下:

syntax = "proto3";
package com.deft.grpc;
import "google/protobuf/empty.proto";
import "profile_descriptor.proto";
service ProfileService {
  rpc GetCurrentProfile (google.protobuf.Empty) returns (ProfileDescriptor) {}
  rpc clientStream (stream ProfileDescriptor) returns (google.protobuf.Empty) {}
  rpc serverStream (google.protobuf.Empty) returns (stream ProfileDescriptor) {}
  rpc biDirectionalStream (stream ProfileDescriptor) returns (stream 	ProfileDescriptor) {}
}

gRPC 提供了多种灵活的客户端-服务器通信模式。 我们来逐一解析这些模式:

  • 标准 RPC 调用: 这是最基本的请求/响应模式。
  • 客户端流式传输: 客户端向服务器发送数据流。
  • 服务器流式传输: 服务器向客户端发送数据流。
  • 双向流式传输: 客户端和服务器可以同时发送和接收数据流。

`ProfileService` 服务依赖于 `ProfileDescriptor`,其定义在导入部分中指定:

syntax = "proto3";
package com.deft.grpc;
message ProfileDescriptor {
  int64 profile_id = 1;
  string name = 2;
}
  • `int64` 在 Java 中对应 `Long` 类型,用于表示用户的 profile ID。
  • `string` 和 Java 中的 `String` 类型类似,用于表示字符串数据。

您可以使用 Gradle 或 Maven 来构建您的项目。 为了方便起见,我选择使用 Maven。 需要注意的是,对于 Gradle,生成的 .proto 文件结构以及构建文件的配置会有所不同。 要构建一个简单的 gRPC 服务器,我们只需要一个依赖项:

<dependency>
    <groupId>io.github.lognet</groupId>
    <artifactId>grpc-spring-boot-starter</artifactId>
    <version>4.5.4</version>
</dependency>

这个启动器为我们简化了许多配置工作。

我们的项目结构如下所示:

我们需要 `GrpcServerApplication` 来启动 Spring Boot 应用程序。 另外还需要 `GrpcProfileService`,它将实现 .proto 文件中定义的方法。 为了使用 `protoc` 工具并从 .proto 文件生成 Java 类,需要在 `pom.xml` 文件中添加 `protobuf-maven-plugin` 插件。 构建部分配置如下:

<build>
        <extensions>
            <extension>
                <groupId>kr.motd.maven</groupId>
                <artifactId>os-maven-plugin</artifactId>
                <version>1.6.2</version>
            </extension>
        </extensions>
        <plugins>
            <plugin>
                <groupId>org.xolstice.maven.plugins</groupId>
                <artifactId>protobuf-maven-plugin</artifactId>
                <version>0.6.1</version>
                <configuration>
                    <protoSourceRoot>${project.basedir}/src/main/proto</protoSourceRoot>
                    <outputDirectory>${basedir}/target/generated-sources/grpc-java</outputDirectory>
                    <protocArtifact>com.google.protobuf:protoc:3.12.0:exe:${os.detected.classifier}</protocArtifact>
                    <pluginId>grpc-java</pluginId>
                    <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.38.0:exe:${os.detected.classifier}</pluginArtifact>
                    <clearOutputDirectory>false</clearOutputDirectory>
                </configuration>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>compile-custom</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
  • `protoSourceRoot`:指定 .proto 文件所在的目录路径。
  • `outputDirectory`:设置生成代码的输出目录。
  • `clearOutputDirectory`:标志是否清除输出目录中的内容。

完成配置后,您可以构建项目。 之后,您需要访问输出目录,其中包含生成的文件。 现在我们可以逐步实现 `GrpcProfileService` 了。

类声明如下:

@GRpcService
public class GrpcProfileService extends ProfileServiceGrpc.ProfileServiceImplBase

使用 `@GRpcService` 注解将该类标识为一个 gRPC 服务 Bean。

因为我们继承了 `ProfileServiceGrpc.ProfileServiceImplBase`,所以我们可以重写其父类方法。 首先要重写的方法是 `getCurrentProfile`:

    @Override
    public void getCurrentProfile(Empty request, StreamObserver<ProfileDescriptorOuterClass.ProfileDescriptor> responseObserver) {
        System.out.println("getCurrentProfile");
        responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                .newBuilder()
                .setProfileId(1)
                .setName("test")
                .build());
        responseObserver.onCompleted();
    }

为了响应客户端请求,需要在传入的 `StreamObserver` 上调用 `onNext` 方法来发送数据。 发送响应后,需要调用 `onCompleted` 方法向客户端发出服务已完成的信号。 当向 `getCurrentProfile` 服务发送请求时,响应数据将是:

{
  "profile_id": "1",
  "name": "test"
}

接下来,我们来分析一下服务器流。 在这种消息传递模式下,客户端向服务器发送一个请求,服务器随后以流的形式向客户端发送多个响应。 例如,我们可以让服务器循环发送五个响应。 发送完成后,服务器会向客户端发送一个表明流传输完成的消息。

重写的服务器流方法如下所示:

@Override
    public void serverStream(Empty request, StreamObserver<ProfileDescriptorOuterClass.ProfileDescriptor> responseObserver) {
        for (int i = 0; i < 5; i++) {
            responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                    .newBuilder()
                    .setProfileId(i)
                    .build());
        }
        responseObserver.onCompleted();
    }

这样,客户端将接收到五条消息,每条消息的 `ProfileId` 依次递增。

{
  "profile_id": "0",
  "name": ""
}
{
  "profile_id": "1",
  "name": ""
}
…
{
  "profile_id": "4",
  "name": ""
}

客户端流的模式与服务器流非常相似,只是现在是客户端发送消息流,而服务器负责处理这些消息。 服务器可以选择立即处理接收到的消息,也可以选择等待接收完所有消息后再进行处理。

    @Override
    public StreamObserver<ProfileDescriptorOuterClass.ProfileDescriptor> clientStream(StreamObserver<Empty> responseObserver) {
        return new StreamObserver<>() {

            @Override
            public void onNext(ProfileDescriptorOuterClass.ProfileDescriptor profileDescriptor) {
                log.info("ProfileDescriptor from client. Profile id: {}", profileDescriptor.getProfileId());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

在客户端流中,你需要返回一个 `StreamObserver` 给客户端,服务器通过它来接收消息。 如果流中发生错误(比如连接意外中断),将会调用 `onError` 方法。

实现双向流需要将服务器流和客户端流的特性结合起来。

@Override
    public StreamObserver<ProfileDescriptorOuterClass.ProfileDescriptor> biDirectionalStream(
            StreamObserver<ProfileDescriptorOuterClass.ProfileDescriptor> responseObserver) {

        return new StreamObserver<>() {
            int pointCount = 0;
            @Override
            public void onNext(ProfileDescriptorOuterClass.ProfileDescriptor profileDescriptor) {
                log.info("biDirectionalStream, pointCount {}", pointCount);
                responseObserver.onNext(ProfileDescriptorOuterClass.ProfileDescriptor
                        .newBuilder()
                        .setProfileId(pointCount++)
                        .build());
            }

            @Override
            public void onError(Throwable throwable) {

            }

            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
    }

在这个示例中,服务器会响应客户端发送的每一条消息,并返回一个 `profileId` 递增的 `ProfileDescriptor` 对象。

结论

本文介绍了在客户端和服务器之间使用 gRPC 进行消息传递的基本方法,其中包括服务器流、客户端流以及双向流的实现。

文章由谢尔盖·戈利岑撰写