我們可以在初始化 Rsocket 範例的時候指定使用者端可以被呼叫的方法,使用 acceptor()
指定可被呼叫的方法和方法使用的通訊模型型別:
RequestResponse
時:.acceptor(SocketAcceptor.forRequestResponse(payload -> {}))
FireAndForget
時.acceptor(SocketAcceptor.forFireAndForget(payload -> {}))
RequestStream
時.acceptor(SocketAcceptor.forRequestStream(payload -> {}))
RequestStream
時.acceptor(SocketAcceptor.forRequestChannel(
payloads ->
Flux.from(payloads)...));
接下來編寫使用者端方法的處理邏輯,以 RequestResponse
為例
public static void main(String[] args) {
final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
// 隨機生成 UUID 標識使用者端
UUID uuid = UUID.randomUUID();
logger.info("My UUID is {}", uuid);
// 生成 SETUP 階段(建立連線時) Payload 使用的 route 資訊
ByteBuf setupRouteMetadata = encodeRoute("connect.setup");
RSocket socket = RSocketConnector.create()
// 設定 metadata MIME Type,方便伺服器端根據 MIME 型別確定 metadata 內容
.metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
// SETUP 階段的 Payload,data 裡面存放 UUID
.setupPayload(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
setupRouteMetadata))
// 編寫 Request&Response Acceptor
.acceptor(SocketAcceptor.forRequestResponse(
payload -> {
String route = decodeRoute(payload.sliceMetadata());
logger.info("[Client Acceptor] Received RequestResponse[route={}]", route);
String metadataUtf8 = payload.getMetadataUtf8();
String dataUtf8 = payload.getDataUtf8();
logger.info("[Client Acceptor] This Req&Resp contains data: {}, metadata: {}", dataUtf8, metadataUtf8);
payload.release();
if ("request.status.callback".equals(route)) {
return Mono.just(ByteBufPayload.create("Thanks for handling my task!"));
} else if ("request.server.call".equals(route)) {
return Mono.just(ByteBufPayload.create("You called my handler actively from server!"));
}
byte[] respBytes = String
.format("Client received your message, but no handler matched. Your meta is %s and data is %s",
metadataUtf8, dataUtf8).getBytes();
return Mono.just(DefaultPayload.create(respBytes));
}
))
// 設定重連策略
.reconnect(Retry.backoff(2, Duration.ofMillis(500)))
.connect(
TcpClientTransport.create(
TcpClient.create()
.host("127.0.0.1")
.port(8099)))
.block();
在這裡我們設定使用者端能夠接收 RequestResponse
型別的伺服器端請求,仔細觀察可以看到,伺服器端傳送的請求也是可以攜帶包含路由資訊的 metadata
的,在使用者端,我們也可以根據 Payload 中的路由資訊將請求分發到不同方法中處理。
為了方便演示,如果伺服器端呼叫時指定的路由資訊是 request.status.callback
,那麼伺服器端就是在完成一個由使用者端發起的,非同步執行的任務後呼叫使用者端的回撥函數返回任務執行結果。
如果伺服器端呼叫時指定的路由資訊是 request.server.call
,那麼伺服器端就是在主動呼叫使用者端以獲取一些狀態資訊。
當然,使用上面的程式碼設定使用者端可被呼叫的 RSocket 方法有一個侷限性,那就是我們只能設定 RequestResponse
FireAndForget
RequestStream
Channel
這四種通訊模式的其中一種。也就是說,用這種方法,伺服器端無法同時向伺服器端發出 RequestResponse
FireAndForget
RequestStream
Channel
請求。本文會在第四部分展示如何讓使用者端支援同時響應這四種通訊模式。
如果使用者端提交一個耗時任務,伺服器端可以接受這個任務然後立刻返回響應:「任務提交成功」,然後執行任務。當任務執行完,伺服器端再使用回撥函數將結果返回給使用者端。
我們不妨將執行任務的模組封裝成一個 Spring Service:
@Service
public class RequestProcessor {
private static final Logger logger = LoggerFactory.getLogger(RequestProcessor.class);
public void processRequests(RSocketRequester rSocketRequester, UUID uuid) {
logger.info("[RequestProcessor.processRequests]I'm handling this!");
ByteBuf routeMetadata = TaggingMetadataCodec.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList("request.status.callback"));
Mono.just("Your request " + uuid + " is completed")
.delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(10, 15)))
.flatMap(
m -> rSocketRequester.rsocketClient()
.requestResponse(
Mono.just(ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT,
String.format("[TASK %s]This is a task result from server using spring.", uuid)),
routeMetadata
)))
.doOnSuccess(p -> logger.info("[RequestProcessor.processRequests]Received from client: {}", p.getDataUtf8()))
)
.subscribe();
}
}
這個 Service 中的方法接收一個 RSocketRequester
和一個 任務的 UUID,當任務完成時,這個方法會生成一個 Payload 存放任務結果,指定 metadata 中的路由資訊為 request.status.callback
。這樣使用者端在收到這個 RequestResponse 時就能知道這是一個已經提交任務的回撥。在這裡我們使用 delayElement
模擬處理任務時耗時的操作。
值得注意的是,RSocketRequester
引數的來源,我們在編寫伺服器端接收任務提交的方法時可以將其作為引數,這是 Spring RSocket 的固定用法,這樣就可以拿到伺服器端-使用者端連線的 RSocketRequester 範例,然後就可以在 Service 中通過 RSocketRequester 範例呼叫使用者端的回撥函數:
@MessageMapping("handler.task")
public Mono<String> task(String request, RSocketRequester rSocketRequester) {
logger.info("[handler.request]Client request: {}", request);
UUID uuid = UUID.randomUUID();
this.requestProcessor.processRequests(rSocketRequester, uuid);
return Mono.just(uuid.toString());
}
我們在【RSocket】使用 RSocket (一)——建立連線一文中已經在連線建立的時刻將使用者端-伺服器端連線的 RSocketRequester
範例儲存在一個 ConcurrentHashMap
中了。我們可以通過一些機制,比如定時任務,或者使用 REST API 向伺服器端下命令的方式,讓伺服器端主動呼叫已經建立連線的使用者端的 RSocket 方法。
在這個範例裡,我們編寫兩個 REST API,一個 API 返回所有已連線到伺服器端的使用者端資訊,包括使用者端 UUID、連線建立的時間等:
@ResponseBody
@GetMapping("/client/list")
public List<ConnectedClientDto> clientsInfo() {
List<ConnectedClientDto> info = new ArrayList<>();
RSocketController.clientsManager.clients.forEach((key, value) -> {
info.add(new ConnectedClientDto(key, value.connectedTime));
});
return info;
}
另一個 API 用於觸發伺服器端向用戶端傳送請求:
@GetMapping("/client/call")
public ServerResponse callFromServer(String clientRoute, String clientUUID) {
RSocketRequester requester = RSocketController.clientsManager.getClientRequester(clientUUID);
if (requester == null) {
return new ServerResponse("failed: client rSocket has closed.");
}
ByteBuf routeMetadata = TaggingMetadataCodec
.createTaggingContent(ByteBufAllocator.DEFAULT, Collections.singletonList(clientRoute));
Mono.just("Server is calling you.")
// .delayElement(Duration.ofSeconds(ThreadLocalRandom.current().nextInt(5, 10)))
.flatMap(m -> requester.rsocketClient().requestResponse(
Mono.just(
ByteBufPayload.create(
ByteBufUtil.writeUtf8(
ByteBufAllocator.DEFAULT,
"This is a message from server using spring-stack."),
routeMetadata)))
.doOnSubscribe(subscription -> logger.info("subscribed."))
.doOnError(throwable -> logger.error("Error when calling client: {}", throwable.toString()))
.doOnSuccess(p -> logger.info("[test.connect.requester]Received from client: {}.", p.getDataUtf8()))
)
.subscribe();
return new ServerResponse(String.format("request from server has sent to the client %s.", clientUUID));
}
我們首先啟動伺服器端再啟動使用者端,然後測試上述兩個 API:
啟動兩個使用者端和伺服器端後檢視連線資訊
向其中一個使用者端傳送一個請求
可以從使用者端的輸出看到使用者端接收到了這次請求
前面我們提到如果使用 .acceptor(SocketAcceptor.for...)
來新增使用者端可以被呼叫的方法時,只能指定四種通訊模式中的一種。
這時候,我們可以實現 io.rsocket.SocketAcceptor
介面,重寫 accept
方法,accept
方法的返回值是 Mono<RSocket>
,我們可以實現 RSocket
介面並重寫其中 fireAndForget
requestResponse
requestStream
requestChannel
四個方法來達到讓使用者端同時接收四種通訊模式的目的。
首先實現 RSocket
介面,並重寫其中的方法:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/service/ClientService.java
public class ClientService implements RSocket {
Logger logger = LoggerFactory.getLogger(ClientService.class);
static String decodeRoute(ByteBuf metadata) {
final RoutingMetadata routingMetadata = new RoutingMetadata(metadata);
return routingMetadata.iterator().next();
}
@Override
public Mono<Void> fireAndForget(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.empty();
}
@Override
public Mono<Payload> requestResponse(Payload payload) {
logger.info("Receiving: " + payload.getDataUtf8());
return Mono.just(DefaultPayload.create("Client received your RequestResponse"));
}
@Override
public Flux<Payload> requestStream(Payload payload) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
@Override
public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
return Flux.range(-5, 10)
.delayElements(Duration.ofMillis(500))
.map(obj ->
ByteBufPayload.create(
ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, obj.toString())));
}
}
這只是一個範例,如果業務需要也可以解析 Payload 中的 metadata 來實現路由。
接下來我們實現 RSocketAcceptor
介面:
// https://github.com/joexu01/rsocket-demo/blob/master/rsocket-client-raw/src/main/java/org/example/SocketAcceptorImpl.java
public class SocketAcceptorImpl implements SocketAcceptor {
@Override
public Mono<RSocket> accept(ConnectionSetupPayload connectionSetupPayload, RSocket rSocket) {
return Mono.just(new ClientService());
}
}
然後我們在初始化使用者端的時候這樣設定 Acceptor 即可:
RSocket socket = RSocketConnector.create().acceptor(new SocketAcceptorImpl())