这篇文章将介绍如何使用grpc-java开发一个双向流式RPC通信实例,模拟一个简易在线聊天室。

引入依赖

新建一个maven项目,按照官网的示例,pom.xml中需添加以下依赖:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>1.43.1</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>1.43.1</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>1.43.1</version>
</dependency>
<dependency> <!-- necessary for Java 9+ -->
<groupId>org.apache.tomcat</groupId>
<artifactId>annotations-api</artifactId>
<version>6.0.53</version>
<scope>provided</scope>
</dependency>

然后添加protocmaven插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<build>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.5.0.Final</version>
</extension>
</extensions>
<plugins>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.19.1:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.43.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

接下来我们就可以通过编写.proto文件来定义rpc接口和数据类型,默认情况下.proto文件需放置在src/main/protosrc/test/proto目录中(如没有则相应新建文件夹)。

编写.proto文件

下面编写一个bidirectional_stream.proto文件定义一个双向流式通信的rpc服务,.proto文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
syntax = "proto3";

option java_multiple_files = true;
option java_package = "org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream";
option java_outer_classname = "BidirectionalStreamProto";

service BidirectionalStreamService {
rpc BidirectionalStreamCommunication(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {}
}

message BidirectionalStreamRequest {
string req_msg = 1;
}

message BidirectionalStreamResponse {
string resp_msg = 1;
}

以上就定义了一个双向流式通信rpc,客户端和服务端均使用流进行通信。定义了一个BidirectionalStreamCommunication rpc服务,入参BidirectionalStreamRequest,返参BidirectionalStreamResponse

生成grpcprotobuf相关类

1
2
mvn protobuf:compile
mvn protobuf:compile-custom

protoc

使用IDEAmaven插件快速执行以上命令,在target目录下可看到生成的相关类如下图所示。

target

编写双向流式通信grpc服务端

首先实现.proto文件中定义的BidirectionalStreamService服务端流式响应接口,编写BidirectionalStreamServiceImpl.java如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.server;

import io.grpc.stub.StreamObserver;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamRequest;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamServiceGrpc;

/**
* 双向流式通信服务端逻辑实现
* @author sunchaser admin@lilu.org.cn
* @since 2022/1/17
*/
public class BidirectionalStreamServiceImpl extends BidirectionalStreamServiceGrpc.BidirectionalStreamServiceImplBase {
private StreamObserver<BidirectionalStreamResponse> responseStreamObserver;

@Override
public StreamObserver<BidirectionalStreamRequest> bidirectionalStreamCommunication(StreamObserver<BidirectionalStreamResponse> responseObserver) {
this.responseStreamObserver = responseObserver;
return new StreamObserver<BidirectionalStreamRequest>() {
@Override
public void onNext(BidirectionalStreamRequest bidirectionalStreamRequest) {
String reqMsg = bidirectionalStreamRequest.getReqMsg();
System.out.println("[服务端]-[收到客户端消息]:" + reqMsg);
responseObserver.onNext(
BidirectionalStreamResponse.newBuilder()
.setRespMsg("hello client, I'm Java grpc server, your message '" + reqMsg + "' has been received.")
.build()
);
}

@Override
public void onError(Throwable throwable) {
throwable.fillInStackTrace();
}

@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}

public StreamObserver<BidirectionalStreamResponse> getResponseStreamObserver() {
return responseStreamObserver;
}
}

使用成员变量接收服务端响应流,为了后面使用此流给客户端发送信息。编写服务端启动类BidirectionalStreamServer.java如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.server;

import io.grpc.Server;
import io.grpc.ServerBuilder;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse;

import java.io.IOException;
import java.util.Scanner;

/**
* 双向流式通信服务端
* @author sunchaser admin@lilu.org.cn
* @since 2022/1/17
*/
public class BidirectionalStreamServer {
public static void main(String[] args) throws IOException, InterruptedException {
int port = 6666;
BidirectionalStreamServiceImpl streamService = new BidirectionalStreamServiceImpl();
Server server = ServerBuilder.forPort(port)
.addService(streamService)
.build()
.start();
System.out.println("Bidirectional Stream GRPC Server started, listening on " + port);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("EOF".equals(line)) {
break;
}
try {
streamService.getResponseStreamObserver()
.onNext(
BidirectionalStreamResponse.newBuilder()
.setRespMsg(line)
.build()
);
} catch (Exception e) {
System.out.println("[error]-没有客户端连接");
e.printStackTrace();
}
}
}).start();
server.awaitTermination();
}
}

运行main方法启动服务端,我们可以在控制台向客户端发送消息:

server-error

可以看到由于此时还没有客户端与其建立连接,所以会打印:[error]-没有客户端连接

编写grpc客户端

编写BidirectionalStreamClient.java客户端如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.client;

import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.stub.StreamObserver;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamRequest;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse;
import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamServiceGrpc;

import java.util.Scanner;

/**
* 双向流式通信客户端
* @author sunchaser admin@lilu.org.cn
* @since 2022/1/17
*/
public class BidirectionalStreamClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6666)
.usePlaintext()
.build();
StreamObserver<BidirectionalStreamRequest> requestStreamObserver = BidirectionalStreamServiceGrpc.newStub(channel)
.bidirectionalStreamCommunication(new StreamObserver<BidirectionalStreamResponse>() {
@Override
public void onNext(BidirectionalStreamResponse bidirectionalStreamResponse) {
System.out.println("[客户端]-[收到服务端消息]:" + bidirectionalStreamResponse.getRespMsg());
}

@Override
public void onError(Throwable throwable) {
}

@Override
public void onCompleted() {
}
});
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("EOF".equals(line)) {
requestStreamObserver.onCompleted();
break;
}
try {
requestStreamObserver.onNext(
BidirectionalStreamRequest.newBuilder()
.setReqMsg(line)
.build()
);
} catch (Exception e) {
System.out.println("[error]-客户端异常");
e.printStackTrace();
}
}
}
}

我们实现了.proto文件中rpc方法定义的客户端流,在onNext方法中打印了服务端发送的信息,运行main方法启动客户端,可在控制台向服务端发送信息:

client-send-to-server

可以看到,服务端作出了响应,现在我们切换到服务端,给客户端发送一条消息:

server-send-to-client

再切回客户端

client-received

可以看到客户端也接收到了服务端发送的消息,至此我们就完成了双向流式rpc通信。