基於Netty的TCP服務架構

2022-10-08 12:05:20

19年寫的一個基礎的TCP服務架構,內建了一個簡單IOC容器,當時的目標是一方面能作為元件供第三方整合實現TCP通訊相關功能,另一方面作為提供一種服務架構正規化。所以框架核心點主要還是通過適度的封裝,隱藏底層的通訊細節,最終呼叫者接受到的是經過合包分包處理的位元組陣列,不涉及具體的協定解析,大家如果使用可以再基於業務進行適度的封裝。

好,廢話不多說,簡單介紹下整個架構和原始碼細節。

Jtcp-cmmon

Jtcp-cmmon主要放置一些基礎設定與工具類。 1、這裡注意的服務設定類與預設設定項 JtcpConfig、JtcpOptions,JtcpConfig 顧名思義就是設定類,而JtcpOptions則定義了預設值; 2、RouteEnum列舉中列出了幾種通用的網路通訊事件型別,作為註解中的欄位定義路由

    public enum RouteEnum {
        OnConnect, //連結
        OnDisconnect, //連結斷開
        OnRecevie, //資料接收
        OnSessionTimeOut, //超時
        OnException //異常
    }

Jtcp-transport

Jtcp-transport 基於Netty提供了TCP服務與報文解析功能,這裡我針對常規固定位元組起始的協定,通過遞迴方式對報文粘包、半包等進行了處理

     /**
     * state = 0 開始解析
     * state = 1 解析(遞迴處理粘包)
     * state = 2 半包
     */

    private void parseCompletePackets(ChannelHandlerContext ctx, byte[] bytesReady, List<Object> out,
            int magicByteBegin, int magicByteEnd)
 throws IOException {
        if (state == 0) { // 開始解析
            dataStream = new ByteArrayOutputStream();
            // 包資料開始狀態,查詢開始標識
            if (bytesReady[0] != magicByteBegin) {//第一包必須從協定報文頭開始
                return;
            }
            state = 1;
        }
        if (state > 0) {
            int pos = indexOfMagicByte(bytesReady, magicByteEnd);//尋找尾部標識index,跳過頭部標識位從1開始
            if(state == 2) {//半包狀態
                if(bytesReady[0] == magicByteEnd) {//半包狀態,但下段報文7E開頭,明顯是不正常的
                    dataStream.reset(); //只能清除目前已累積的所有資料
                }
            }
            if (pos != -1) {
                // 結束標識
                dataStream.write(bytesReady, 0, pos);
                
                byte[] ad = dataStream.toByteArray();
                // 讀取完整一個報文
                out.add(ad);
                // 重置為包開始處理狀態
                state = 0;
                // 將剩餘位元組寫入記憶體位元組流中
                if (pos != bytesReady.length) {
                    byte[] remainBytes = new byte[bytesReady.length - pos];
                    System.arraycopy(bytesReady, pos, remainBytes, 0, remainBytes.length);
                    parseCompletePackets(ctx, remainBytes, out, magicByteBegin, magicByteEnd);
                }
            } else {
                // 無結束標識,非完成報文,繼續後續處理
                state = 2; //報文體讀取狀態,直接將當前資料寫記憶體位元組流中
                // 在下一次資料過來時處理結束標識
                dataStream.write(bytesReady, 0, bytesReady.length);
            }
        }
    }

Jtcp-core

自定義實現一個IOC容器,可對訊息處理handler進行管理,並通過註解的方式制定訊息轉發機制 首先遍歷main函數下所有class類,並快取所有指定註解@JtcpComponet的class類物件並注入sproutBeanFactory範例工廠

    /**
     * 快取所有指定註解的class<?>類物件
     * @param packageName
     * @return
     * @throws Exception
     */

    public static Map<String, Class<?>> getBean(String packageName) throws Exception {

        if (componetMap == null) {
            Set<Class<?>> clsList = getClasses(packageName);

            if (clsList == null || clsList.isEmpty()) {
                return componetMap;
            }

            componetMap = new HashMap<>(16);
            for (Class<?> cls : clsList) {

                Annotation annotation = cls.getAnnotation(JtcpComponet.class);
                if (annotation == null) {
                    continue;
                }

                JtcpComponet sproutComponet = (JtcpComponet) annotation;
                componetMap.put(sproutComponet.value() == null ? cls.getName() : sproutComponet.value(), cls);

            }
        }
        return componetMap;
    }

實現方法路由,通過@JtcpRoute並結合上面定義連結、斷開、訊息接收、超時、異常等事件列舉型別,把觸發的網路通訊事件轉發至指定的業務方法中處理

    /**
     * 根據註解呼叫方法
     * @param method
     * @param annotation
     * @param args
     * @throws Exception
     */

    public void invoke(RouteEnum routeEnum, Object[] args) throws Exception {
        Method method = RouterScanner.getInstance().routeMethod(routeEnum);
        if (method == null) {
            return;
        }
        Object bean = applicationContext.getBean(method.getDeclaringClass().getName());
        if (args == null) {
            method.invoke(bean);
        } else {
            method.invoke(bean, args);
        }
    }

channelRead接收資料並轉發

        /**
         * 接收訊息事件
         */

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object source) {
            try {
                byte[] dataBytes = (byte[]) source;
                JtcpContext sproutContext = new JtcpContext(ctx, dataBytes);
                RouteMethod.getInstance().invoke(RouteEnum.OnRecevie, new Object[] { sproutContext });
            } catch (Exception ex) {
            }
        }

Jtcp-example

範例程式碼

    public static void main(String[] args) throws Exception {
        JtcpBootstrap bootstrap = new JtcpBootstrap();
        bootstrap.config().setHost("127.0.0.1");
        bootstrap.config().setPort(8030);
        bootstrap.start();
    }

    @JtcpComponet
    public class DemoHandler{

        @JtcpRoute(RouteEnum.OnRecevie)
        public void res(JtcpContext jtcpContext) {
            jtcpContext.context.writeAndFlush(jtcpContext.getRecvBytes());
            //System.err.println(BytesUtils.toHexString(context.getRecvBytes()));
        }

        @JtcpRoute(RouteEnum.OnConnect)
        public void onConnect(JtcpContext context ) {
            System.err.println("連線成功");
        }
    }

好的以上就是框架程式碼的基本構造,涉及到了Netty的應用、粘包半包處理,範例快取與方法路由等內容,整體並不複雜,這裡只是提供了一種伺服器端編碼的思路,供初學者參考。

github地址:https://github.com/dafanjoy/jtcp