使用Spring Integration接收TCP與UDP請求

2023-10-21 12:00:30

1. 簡介

Spring Integration 是一個開源的專案,它是 Spring 生態系統的一部分,旨在簡化企業整合(Enterprise Integration)的開發。它提供了一種構建訊息驅動的、鬆散耦合的、可延伸的企業應用整合解決方案的方式。Spring Integration 基於 Spring Framework 構建,使開發者能夠更容易地將不同的系統、應用程式和服務整合到一個協調的整體中。

Spring Integration 主要有以下作用

  1. 訊息驅動的整合:Spring Integration 基於訊息傳遞的模式,允許系統和應用程式通過訊息進行通訊。這種模式可以用於非同步整合,以確保系統能夠鬆散耦合,以及在高負載和大規模情況下具有良好的效能。
  2. 模組化和可延伸:Spring Integration 提供了一組模組,每個模組都用於處理特定型別的整合需求。這些模組可以按需組合和擴充套件,使開發者能夠根據應用程式的需要選擇合適的模組,並自定義它們。
  3. 整合各種傳輸協定和資料格式:Spring Integration 支援各種傳輸協定(例如,HTTP、JMS、FTP、SMTP等)和資料格式(例如,JSON、XML、CSV等),以便實現不同系統之間的資料傳輸和轉換。
  4. 企業模式的整合:Spring Integration 提供了一些企業整合模式的實現,例如訊息路由、訊息轉換、訊息過濾、訊息聚合等,以幫助解決不同場景下的整合挑戰。
  5. 與 Spring 生態系統的整合:Spring Integration 與 Spring Framework 和 Spring Boot 緊密整合,開發者可以輕鬆整合已有的 Spring 應用程式,同時利用 Spring 的依賴注入和 AOP(面向切面程式設計)等功能。

2. 程式碼實戰

本文主要介紹 Spring Integration 接收TCP與UDP請求的範例。在專案中,我們偶爾需要接收其他服務的TCP與UDP請求,此時使用Netty可能會過度設計,想要一個輕量級nio的TCP、UDP伺服器端的話,我們可以選擇 Spring Integration。

環境:

  1. JDK21
  2. SpringBoot 3.1.4
  3. Spring Integration 6.1.3

2.1 匯入依賴

<!-- 父工程,主要用作版本管控 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>3.1.4</version>
    <relativePath />
</parent>

<!-- springboot-web -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- spring-integration -->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-ip</artifactId>
</dependency>

注意:如果你的SpringBoot版本是2.x版本,那麼你需要使用JDK21以下的版本,因為JDK中的包名有所更改。

2.2 建立TCP伺服器端

新建設定類TcpServerConfig,其中tcp.server.port需要到application.yml或者application.properties中進行設定。或者你也可以直接填寫埠。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.ip.tcp.TcpReceivingChannelAdapter;
import org.springframework.integration.ip.tcp.connection.AbstractServerConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpNioServerConnectionFactory;

@Slf4j
@Configuration
public class TcpServerConfig {

    @Value("${tcp.server.port}")
    private int PORT;

    /**
     * 建立連線工廠
     * @return
     */
    @Bean
    public AbstractServerConnectionFactory serverConnectionFactory() {
        TcpNioServerConnectionFactory tcpNioServerConnectionFactory = new TcpNioServerConnectionFactory(PORT);
        tcpNioServerConnectionFactory.setUsingDirectBuffers(true);
        return tcpNioServerConnectionFactory;
    }

    /**
     * 建立訊息通道
     * @return
     */
    @Bean
    public DirectChannel tcpReceiveChannel() {
        return new DirectChannel();
    }

    /**
     * 建立tcp接收通道介面卡
     * @return
     */
    @Bean
    public TcpReceivingChannelAdapter inboundAdapter() {
        TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
        adapter.setConnectionFactory(serverConnectionFactory());
        adapter.setOutputChannelName("tcpReceiveChannel");
        return adapter;
    }

    /**
     * 處理請求器
     * @param message
     */
    @ServiceActivator(inputChannel = "tcpReceiveChannel")
    public void messageReceiver(byte[] message) {
        // 處理接收到的TCP訊息
        log.info("處理TCP請求");
    }
}

注意:在傳送tcp報文的時候,tcp報文需要以\r\n結尾,否則無法正常接收報文。

2.3 建立UDP伺服器端

新建設定類UdpServerConfig,其中udp.server.port需要到application.yml或者application.properties中進行設定。或者你也可以直接填寫埠。

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.ip.dsl.Udp;
import org.springframework.messaging.Message;

@Slf4j
@Configuration
public class UdpServerConfig {

    @Value("${udp.server.port}")
    private int PORT;

    /**
     * 建立UDP伺服器接收通道介面卡
     * @return
     */
    @Bean
    public IntegrationFlow udpIn() {
        return IntegrationFlow.from(Udp.inboundAdapter(PORT))
                .channel("udpReceiveChannel")
                .get();
    }


    /**
     * 建立訊息接收通道
     * @return
     */
    @Bean
    public DirectChannel udpReceiveChannel() {
        return new DirectChannel();
    }

    /**
     * 處理接收到的UDP訊息
     * @param message
     */
    @ServiceActivator(inputChannel = "udpReceiveChannel")
    public void udpHandleMessage(Message<byte[]> message) {
        // 處理接收到的UDP訊息
        byte[] payload = message.getPayload();
        log.info("處理UDP請求");
    }
}

3. 總結

對比Netty,Spring Integration比較輕量級,也更容易整合到 SpringBoot 中,但是效能肯定不如Netty。這裡也只是給接收TCP、UDP請求設計方面多一個選擇。