【RSocket】使用 RSocket (一)——建立連線

2023-03-06 06:00:30

0. RSocket 簡介

採用二進位制對等資料傳輸,主要應用於分散式架構之中,是一種基於Reactive Stream規範標準實現的新的通訊協定。

參考阿里雲開發者社群的介紹

相關檔案和資料:

RSocket By Example

rsocket-java 原生庫例子

Spring RSocket 支援檔案

在這裡我們在使用者端使用 rsocket-java 原生庫,在伺服器端使用 spring-boot-starter-rsocket。

1. 伺服器端

1.1 SETUP階段 - 處理使用者端發起的連線請求

點選檢視原始碼

新建一個 RSocketController 類來處理 RSocket 相關的請求。

@Controller
public class RSocketController {

    private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

    // 對到來的連線做一些處理
    @ConnectMapping("connect.setup")
    public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
        logger.info("[connect.setup]Client connection: {}\n", data);
        return Mono.empty();
    }
}

RSocket 的 metadata 中可以包含路由(Routing)資訊,這和 一般 WEB 框架通過解析 URL 將請求導向不同的處理常式是一樣的。在連線建立時,使用者端會傳送一個 SETUP Payload,@ConnectMapping 可以通過解析 SETUP Payload 的 metadata 中的路由資訊來使用不同的連線建立階段的處理常式。在這裡,只要 SETUP Payload 的 metadata 中的路由資訊是 connect.setup ,該函數就會處理建立連線後用戶端傳送的 SETUP Payload。

1.2 儲存使用者端的 Requester

RSocket 協定支援雙方主動呼叫對方的函數。如果伺服器端想要主動向使用者端傳送請求,他就可以在連線建立時儲存 RSocketRequester 物件以便伺服器端在需要時向用戶端發起請求。

首先在這裡我們假設使用者端建立連線時會將 UUID 放在 SETUP Payload 的 data 中。然後我們宣告一個類來儲存 RSocketRequester,程式碼如下:

public class ConnectedClient {
    public RSocketRequester requester;
    public Date connectedTime;

    ConnectedClient(RSocketRequester requester) {
        this.requester = requester;
        this.connectedTime = new Date();
    }
}

然後我們建立一個 Service 來管理使用者端的 RSocketRequester。在這裡使用 ConcurrentHashMap 來儲存 Requester,鍵是使用者端的 UUID,值是 ConnectedClient 物件。

@Service
public class ConnectedClientsManager {
    private static Logger logger = LoggerFactory.getLogger(ConnectedClientsManager.class);
    public final ConcurrentHashMap<String, ConnectedClient> clients;

    public ConnectedClientsManager() {
        this.clients = new ConcurrentHashMap<>();
    }

    public Set<String> getAllClientIdentifier() {
        return this.clients.keySet();
    }

    public RSocketRequester getClientRequester(String clientIdentifier) {
        return this.clients.get(clientIdentifier).requester;
    }

    public void putClientRequester(String clientIdentifier, RSocketRequester requester) {
        requester.rsocket()
                .onClose()
                .doFirst(() -> this.clients.put(clientIdentifier, new ConnectedClient(requester)))
                .doFinally(sig -> {
                    logger.info("Client closed, uuid is {}. signal is {}.", clientIdentifier, sig.toString());
                    this.clients.remove(clientIdentifier);
                }).subscribe();
    }

    public void removeClientRequester(String clientIdentifier) {
        this.clients.remove(clientIdentifier);
    }
}

然後我們就可以在 RSocketController 中引入 ConnectedClientsManager 了。

@Controller
public class RSocketController {

    private static Logger logger = LoggerFactory.getLogger(RSocketController.class);

    public static ConnectedClientsManager clientsManager;

    @Autowired
    private void initializeClientsManager() {
        clientsManager = new ConnectedClientsManager();
    }
...

最後我們編寫連線處理常式,將 Requester 儲存起來:

@ConnectMapping("connect.setup")
    public Mono<Void> setup(String data, RSocketRequester rSocketRequester) {
        logger.info("[connect.setup]Client connection: {}\n", data);
        clientsManager.putClientRequester(data, rSocketRequester);
        return Mono.empty();
    }

下面是 spring application 設定 application.yaml

spring:
  rsocket:
    server:
      port: 8099
      transport: tcp

2. 使用者端

點選檢視原始碼

  • 第一步:隨機生成標識使用者端身份的 UUID
public class ConnectionSetup {

    public static void main(String[] args) {
        final Logger logger = LoggerFactory.getLogger(RSocketClientRaw.class);
        UUID uuid = UUID.randomUUID();
......
  • 第二步:生成 SETUP Payload 使用的 routing 資訊
ByteBuf setupRouteMetadata = TaggingMetadataCodec.createTaggingContent(
                ByteBufAllocator.DEFAULT,
                Collections.singletonList("connect.setup"));
  • 第三步:使用 RSocketConnector 建立 RSocket:
    • 在這裡首先需要設定後設資料的 MIME 型別,方便伺服器端根據 MIME 型別確定 metadata 的內容
    • 然後生成 SETUP Payload,data 中存放 UUID 字串,metadata 中存放路由資訊
    • 設定重連策略
    • 最後指定 ClientTransport 和伺服器端建立連線
    • 使用 block() 在連線建立真正之前阻塞程序
RSocket socket = RSocketConnector.create()
                // 設定 metadata MIME Type,方便伺服器端根據 MIME 型別確定 metadata 內容
                .metadataMimeType(WellKnownMimeType.MESSAGE_RSOCKET_ROUTING.getString())
                // SETUP 階段的 Payload,data 裡面存放 UUID
                .setupPayload(ByteBufPayload.create(
                        ByteBufUtil.writeUtf8(ByteBufAllocator.DEFAULT, uuid.toString()),
                        setupRouteMetadata))
                // 設定重連策略
                .reconnect(Retry.backoff(2, Duration.ofMillis(500)))
                .connect(
                        TcpClientTransport.create(
                                TcpClient.create()
                                        .host("127.0.0.1")
                                        .port(8099)))
                .block();

然後可以使用 socket.onClose().block(); 保持連線。此時如果我們執行使用者端,然後再關閉使用者端的話,會在伺服器端看到輸出:

表明使用者端和伺服器端建立了連線之後又關閉了連線。