🌀grpc-java双向流式RPC通信实例
|字数总计:1.5k|阅读时长:7分钟|阅读量:|
这篇文章将介绍如何使用grpc-java
开发一个双向流式RPC
通信实例,模拟一个简易在线聊天室。
引入依赖
新建一个maven
项目,按照官网的示例,pom.xml
中需添加以下依赖:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-netty-shaded</artifactId> <version>1.43.1</version> <scope>runtime</scope> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-protobuf</artifactId> <version>1.43.1</version> </dependency> <dependency> <groupId>io.grpc</groupId> <artifactId>grpc-stub</artifactId> <version>1.43.1</version> </dependency> <dependency> <!-- necessary for Java 9+ --> <groupId>org.apache.tomcat</groupId> <artifactId>annotations-api</artifactId> <version>6.0.53</version> <scope>provided</scope> </dependency>
|
然后添加protoc
的maven
插件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| <build> <extensions> <extension> <groupId>kr.motd.maven</groupId> <artifactId>os-maven-plugin</artifactId> <version>1.5.0.Final</version> </extension> </extensions> <plugins> <plugin> <groupId>org.xolstice.maven.plugins</groupId> <artifactId>protobuf-maven-plugin</artifactId> <version>0.6.1</version> <configuration> <protocArtifact>com.google.protobuf:protoc:3.19.1:exe:${os.detected.classifier}</protocArtifact> <pluginId>grpc-java</pluginId> <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.43.1:exe:${os.detected.classifier}</pluginArtifact> </configuration> <executions> <execution> <goals> <goal>compile</goal> <goal>compile-custom</goal> </goals> </execution> </executions> </plugin> </plugins> </build>
|
接下来我们就可以通过编写.proto
文件来定义rpc
接口和数据类型,默认情况下.proto
文件需放置在src/main/proto
和src/test/proto
目录中(如没有则相应新建文件夹)。
编写.proto
文件
下面编写一个bidirectional_stream.proto
文件定义一个双向流式通信的rpc
服务,.proto
文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| syntax = "proto3";
option java_multiple_files = true; option java_package = "org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream"; option java_outer_classname = "BidirectionalStreamProto";
service BidirectionalStreamService { rpc BidirectionalStreamCommunication(stream BidirectionalStreamRequest) returns (stream BidirectionalStreamResponse) {} }
message BidirectionalStreamRequest { string req_msg = 1; }
message BidirectionalStreamResponse { string resp_msg = 1; }
|
以上就定义了一个双向流式通信rpc
,客户端和服务端均使用流进行通信。定义了一个BidirectionalStreamCommunication rpc
服务,入参BidirectionalStreamRequest
,返参BidirectionalStreamResponse
。
生成grpc
及protobuf
相关类
1 2
| mvn protobuf:compile mvn protobuf:compile-custom
|
使用IDEA
的maven
插件快速执行以上命令,在target
目录下可看到生成的相关类如下图所示。
编写双向流式通信grpc
服务端
首先实现.proto
文件中定义的BidirectionalStreamService
服务端流式响应接口,编写BidirectionalStreamServiceImpl.java
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.server;
import io.grpc.stub.StreamObserver; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamRequest; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamServiceGrpc;
public class BidirectionalStreamServiceImpl extends BidirectionalStreamServiceGrpc.BidirectionalStreamServiceImplBase { private StreamObserver<BidirectionalStreamResponse> responseStreamObserver;
@Override public StreamObserver<BidirectionalStreamRequest> bidirectionalStreamCommunication(StreamObserver<BidirectionalStreamResponse> responseObserver) { this.responseStreamObserver = responseObserver; return new StreamObserver<BidirectionalStreamRequest>() { @Override public void onNext(BidirectionalStreamRequest bidirectionalStreamRequest) { String reqMsg = bidirectionalStreamRequest.getReqMsg(); System.out.println("[服务端]-[收到客户端消息]:" + reqMsg); responseObserver.onNext( BidirectionalStreamResponse.newBuilder() .setRespMsg("hello client, I'm Java grpc server, your message '" + reqMsg + "' has been received.") .build() ); }
@Override public void onError(Throwable throwable) { throwable.fillInStackTrace(); }
@Override public void onCompleted() { responseObserver.onCompleted(); } }; }
public StreamObserver<BidirectionalStreamResponse> getResponseStreamObserver() { return responseStreamObserver; } }
|
使用成员变量接收服务端响应流,为了后面使用此流给客户端发送信息。编写服务端启动类BidirectionalStreamServer.java
如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.server;
import io.grpc.Server; import io.grpc.ServerBuilder; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse;
import java.io.IOException; import java.util.Scanner;
public class BidirectionalStreamServer { public static void main(String[] args) throws IOException, InterruptedException { int port = 6666; BidirectionalStreamServiceImpl streamService = new BidirectionalStreamServiceImpl(); Server server = ServerBuilder.forPort(port) .addService(streamService) .build() .start(); System.out.println("Bidirectional Stream GRPC Server started, listening on " + port); new Thread(() -> { Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("EOF".equals(line)) { break; } try { streamService.getResponseStreamObserver() .onNext( BidirectionalStreamResponse.newBuilder() .setRespMsg(line) .build() ); } catch (Exception e) { System.out.println("[error]-没有客户端连接"); e.printStackTrace(); } } }).start(); server.awaitTermination(); } }
|
运行main
方法启动服务端,我们可以在控制台向客户端发送消息:
可以看到由于此时还没有客户端与其建立连接,所以会打印:[error]-没有客户端连接
。
编写grpc
客户端
编写BidirectionalStreamClient.java
客户端如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56
| package org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.client;
import io.grpc.ManagedChannel; import io.grpc.ManagedChannelBuilder; import io.grpc.stub.StreamObserver; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamRequest; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamResponse; import org.sunchaser.sparrow.javaee.grpc.core.bidirectionalstream.BidirectionalStreamServiceGrpc;
import java.util.Scanner;
public class BidirectionalStreamClient { public static void main(String[] args) { ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6666) .usePlaintext() .build(); StreamObserver<BidirectionalStreamRequest> requestStreamObserver = BidirectionalStreamServiceGrpc.newStub(channel) .bidirectionalStreamCommunication(new StreamObserver<BidirectionalStreamResponse>() { @Override public void onNext(BidirectionalStreamResponse bidirectionalStreamResponse) { System.out.println("[客户端]-[收到服务端消息]:" + bidirectionalStreamResponse.getRespMsg()); }
@Override public void onError(Throwable throwable) { }
@Override public void onCompleted() { } }); Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("EOF".equals(line)) { requestStreamObserver.onCompleted(); break; } try { requestStreamObserver.onNext( BidirectionalStreamRequest.newBuilder() .setReqMsg(line) .build() ); } catch (Exception e) { System.out.println("[error]-客户端异常"); e.printStackTrace(); } } } }
|
我们实现了.proto
文件中rpc
方法定义的客户端流,在onNext
方法中打印了服务端发送的信息,运行main
方法启动客户端,可在控制台向服务端发送信息:
可以看到,服务端作出了响应,现在我们切换到服务端,给客户端发送一条消息:
再切回客户端
可以看到客户端也接收到了服务端发送的消息,至此我们就完成了双向流式rpc
通信。