Source Code: https://github.com/joexu01/rsocket-demo
讓我們來簡單複習一下 RSocket 的四種通訊模式:
即發即忘 - FireAndForget:立即傳送一個請求,無需為這個請求傳送響應報文。適用於監控埋點,紀錄檔上報等,這種場景下無需回執,丟失幾個請求無傷大雅
請求響應 - RequestResponse:請求方傳送一條請求訊息,響應方收到請求後並返回一條響應訊息。傳統的HTTP是典型的Request-Response
流式響應 - RequestStream:請求方傳送一個請求報文,響應方發回N個響應報文。傳統的MQ是典型的RequestStream
雙向通道 - Channel:建立一個通道上下文,雙方可以互相傳送訊息。IM是個典型的RequestChannel通訊場景
*本篇文章的使用者端範例檔案在 rsocket-client-raw/src/main/java/org/example/FourCommunicationScheme.java
我們使用 decodeRoute
和 encodeRoute
函數來解碼和編碼路由資訊。
static String decodeRoute(ByteBuf metadata) {
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
return routingMetadata.iterator().next();
}
static ByteBuf encodeRoute(String route) {
return TaggingMetadataCodec.createTaggingContent(
ByteBufAllocator.DEFAULT,
Collections.singletonList(route));
}
伺服器端處理常式
在這裡我們編寫一個簡單的 Handler,它的 Route 是 test.echo
,它接收一個請求並返回請求 Payload 的 data 中的字串。
@MessageMapping("test.echo")
public Mono<String> simplyEcho(String data) throws InterruptedException {
Thread.sleep(1500);
logger.info("[test.echo]Received echo string from client: {}", data);
return Mono.just(String.format("[test.echo]I received your string: %s. Thank you.", data));
}
注意,這裡的引數也可以是 Mono<String>
,然後對 Mono 進行操作並返回。事實上,如果嚴格按照響應式程式設計的策略,這裡應該直接對 Mono
進行操作。
使用者端傳送請求
ByteBuf routeMetadata = encodeRoute("test.echo");
Payload echoPayload = ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a message from client using rsocket-java library."),
routeMetadata);
Mono<Payload>
。然後我們對這個 Mono 設定一些操作(具體操作請看程式碼註釋):Mono<Payload> requestResponse = socket.requestResponse(echoPayload);
requestResponse
// 當 subscribe() 操作開始執行時列印一下紀錄檔
.doOnSubscribe(subscription -> logger.info("Test1 subscribed to {}", subscription.toString()))
// 當攜帶的請求成功後要做的事情
.doOnSuccess(payload -> {
logger.info("Test1 - Successfully returned: {}", payload.getDataUtf8());
payload.release();
})
.doOnError(throwable -> logger.info("Test1 doOnError: {}", throwable.toString()))
// 可以使用 timeout 丟棄等待超時的 Mono
//.timeout(Duration.ofSeconds(1))
// 可以使用 doOnTerminate 在請求結束後做一些工作
// .doOnTerminate(() -> {})
// 但是一定要設定 doOnError
//.doOnError(TimeoutException.class, e -> logger.info("Test1 doOnError: {}", e.toString()))
// .onErrorReturn(TimeoutException.class, DefaultPayload.create("Payload: Test1 - timeout"))
// 可以使用 log() 來觀察資料的狀態
//.log()
// 使用者端在執行 subscribe() 操作時才會開始從伺服器端接收資料流
// 在響應式程式設計中使用 subscribe 操作符是訂閱一個資料流並處理髮布的資料、錯誤和完成訊號的核心方式之一
.subscribe();
請求發出後主執行緒不會阻塞,所以我們需要使用 socket.onClose().block();
保持連線。
然後我們嘗試執行伺服器端和使用者端,看看一看使用者端的輸出:
[main] INFO org.example.RSocketClientRaw - My UUID is 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
[main] INFO org.example.RSocketClientRaw - Test1 subscribed to RequestResponseRequesterMono
[reactor-tcp-epoll-2] INFO org.example.RSocketClientRaw - Test1 - Successfully returned: [test.echo]I received your string: This is a message from client using rsocket-java library.. Thank you.
伺服器端紀錄檔:
2023-03-12 21:47:29.291 INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController : [connect.setup]Client connection: 0718ef3b-9ee0-42f1-9003-700a8aa9a98d
2023-03-12 21:47:32.304 INFO 32099 --- [or-http-epoll-2] o.example.controller.RSocketController : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
使用者端成功地發出請求並收到來自伺服器端的回覆。
伺服器端
@MessageMapping("upload.log")
public void fireAndForgetHandler(@Headers Map<String, Object> header, RSocketRequester requester, String data) {
header.forEach((k, v) -> System.out.printf("[upload.log]header key: %s, val: %s\n", k, v));
System.out.printf("[upload.log]UploadEventLogs: Received log string from client: %s\n", data);
}
伺服器端接受一個請求,不返回任何結果(Fire'n'Forget),只在伺服器端列印 Header 的內容。
使用者端
// 測試 FnF
routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("upload.log"));
socket.fireAndForget(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "This is a log from client using rsocket-java library."),
routeMetadata))
.doOnSubscribe(subscription -> logger.info("Test2 - Fire And Forget onSubscribe: {}", subscription.toString()))
.subscribe();
使用者端輸出:
[main] INFO org.example.RSocketClientRaw - Test2 - Fire And Forget onSubscribe: FireAndForgetRequesterMono
伺服器端輸出:
2023-03-10 15:10:25.675 INFO 5318 --- [or-http-epoll-4] o.example.controller.RSocketController : [test.echo]Received echo string from client: This is a message from client using rsocket-java library.
[upload.log]header key: dataBufferFactory, val: NettyDataBufferFactory (PooledByteBufAllocator(directByDefault: true))
[upload.log]header key: rsocketRequester, val: org.springframework.messaging.rsocket.DefaultRSocketRequester@607cc59
[upload.log]header key: lookupDestination, val: upload.log
[upload.log]header key: contentType, val: application/binary
[upload.log]header key: rsocketFrameType, val: REQUEST_FNF
[upload.log]UploadEventLogs: Received log string from client: This is a log from client using rsocket-java library.
伺服器端
伺服器端接收一個 Mono<String>
然後返回給使用者端包含 10 個 String
的 Flux
。
事實上,嚴格按照響應式程式設計的策略,這裡應該直接對 Mono
進行操作,可以使用 flatMapMany()
把生成的資料流通過非同步方式處理,擴充套件出新的資料流。下面是擴充套件新資料流的簡單範例:
Mono.just(3)
.flatMapMany(i -> Flux.range(0, i))
.subscribe(System.out::println);
在這裡為了演示方便就先列印 Mono
然後新生成一個 Flux
。
@MessageMapping("handler.request.stream")
public Flux<String> responseStreaming(Mono<String> request) {
request
.doOnNext(s -> logger.info("[handler.request.stream]: {}", s))
// 可以使用 then() 結束操作鏈
.then()
.subscribe();
return Flux
.range(1, 10)
.map(idx -> String.format("Resp from Server: %s, Thank you!", idx));
}
使用者端
請看程式碼註釋來理解對資料流 Flux 的各種操作:
// 測試 RequestStream
routeMetadata = encodeRoute("handler.request.stream");
Flux<Payload> requestStream = socket.requestStream(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, "TEST3 - Request&Stream"),
routeMetadata));
requestStream
// 當然可以使用 map 對每個 Payload 進行操作,這會改變資料
// .map(payload -> System.out.printf("%s\n", payload.getDataUtf8()))
.doOnSubscribe(subscription -> logger.info("Test3 subscribed to {}", subscription.toString()))
// 使用 doOnNext 不會對流的資料進行改變
// doOnNext()是一個 Reactor 式流操作符,它允許編寫者註冊一個在每次出現新元素時執行的回撥函數
.doOnNext(nextPayload -> System.out.println("Test3 Received payload: " + nextPayload.getDataUtf8()))
// 當需要從流中選擇一些特定的元素時,可以使用 Flux.take(long n) 操作符
// 該操作符將建立一個新的 Flux,該 Flux 包含原始 Flux 的前 n 個元素
// take 操作符發出了指定數量的元素之後,就不再接收任何元素,並且將取消其上游釋出者的訂閱
// 在這裡伺服器端使用 Flux.range 來限定 Flux 流中的元素個數
// 如果伺服器端使用 Flux.interval 生成一個無限長度的流,使用者端使用 take 接收限定個數的元素
// 便會取消釋出者的訂閱
.take(5)
.subscribe();
使用者端輸出結果:
[main] INFO org.example.RSocketClientRaw - My UUID is 28afc749-75e1-4289-8607-14810103de6c
[main] INFO org.example.RSocketClientRaw - Test3 subscribed to RequestStreamRequesterFlux
Test3 Received payload: Resp from Server: 1, Thank you!
Test3 Received payload: Resp from Server: 2, Thank you!
Test3 Received payload: Resp from Server: 3, Thank you!
Test3 Received payload: Resp from Server: 4, Thank you!
Test3 Received payload: Resp from Server: 5, Thank you!
伺服器端接收到了請求:
2023-03-12 22:01:33.520 INFO 32099 --- [or-http-epoll-3] o.example.controller.RSocketController : [handler.request.stream]: TEST3 - Request&Stream
伺服器端
伺服器端接收來自使用者端的整數位符串,將它們乘以2以後傳送回去。我們不妨把處理使用者端請求流的函數封裝為一個 Spring Service:
@Service
public class MathService {
public Flux<String> doubleInteger(Flux<String> request) {
return request
.map(s -> {
System.out.println("received " + s);
int i = Integer.parseInt(s);
return String.valueOf(i * 2);
});
}
}
編寫處理常式:
@Autowired
private MathService mathService;
@MessageMapping("handler.request.channel")
public Flux<String> responseChannel(Flux<String> payloads) {
return this.mathService.doubleInteger(payloads);
}
使用者端
Flux<Payload> payloadFlux = Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
{
ByteBuf metadata = encodeRoute("handler.request.channel");
return ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString()), metadata);
});
Flux<Payload> channelResp = socket.requestChannel(payloadFlux);
channelResp
.doOnSubscribe(subscription -> logger.info("Test4 subscribed to {}", subscription.toString()))
.doOnError(throwable -> logger.info(throwable.toString()))
.doOnNext(nextPayload -> System.out.println("Test4 Received payload: " + nextPayload.getDataUtf8()))
.subscribe();
使用者端輸出:
[main] INFO org.example.RSocketClientRaw - My UUID is 96ff8fe7-416c-4607-9518-463114725a7a
[main] INFO org.example.RSocketClientRaw - Test4 subscribed to RequestChannelRequesterFlux
Test4 Received payload: -10
Test4 Received payload: -8
Test4 Received payload: -6
Test4 Received payload: -4
Test4 Received payload: -2
Test4 Received payload: 0
Test4 Received payload: 2
Test4 Received payload: 4
Test4 Received payload: 6
Test4 Received payload: 8
伺服器端輸出:
2023-03-12 22:07:05.542 INFO 33083 --- [or-http-epoll-2] o.example.controller.RSocketController : [connect.setup]Client connection: 96ff8fe7-416c-4607-9518-463114725a7a
received -5
received -4
received -3
received -2
received -1
received 0
received 1
received 2
received 3
received 4
下一篇文章會展示伺服器端如何主動呼叫使用者端的函數。如有錯誤歡迎在評論區批評指正!