通過socket、多執行緒、動態代理、反射 實現RPC遠端方法呼叫

2021-03-10 12:00:09

一、概念梳理

1、Socket是什麼?

Socket是應用層與TCP/IP協定族通訊的中間軟體抽象層,它是一組介面。在設計模式中,Socket其實就是一個門面模式,它把複雜的TCP/IP協定族隱藏在Socket介面後面,對使用者來說,一組簡單的介面就是全部,讓Socket去組織資料,以符合指定的協定。

Socket技術詳解

2、動態代理是什麼?

目前java動態代理的實現分為兩種

1.基於JDK的動態代理

2.基於CGILB的動態代理

在業務中使用動態代理,一般是為了給需要實現的方法新增預處理或者新增後續操作,但是不干預實現類的正常業務,把一些基本業務和主要的業務邏輯分離。我們一般所熟知的Spring的AOP原理就是基於動態代理實現的。

【Java知識點詳解 2】動態代理

3、反射是什麼?

Java反射機制是在執行狀態中,對於任意一個類,都能夠知道這個類的所有屬性和方法;對於任意一個物件,都能夠呼叫它的任意方法和屬性;這種動態獲取資訊以及動態呼叫物件方法的功能稱為java語言的反射機制。

【Java知識點詳解 8】反射

4、RPC是什麼?

RPC是遠端過程呼叫的簡稱,廣泛應用在大規模分散式應用中,作用是有助於系統的垂直拆分,使系統更易拓展。Java中的RPC框架比較多,各有特色,廣泛使用的有RMI、Hessian、Dubbo等。RPC還有一個特點就是能夠跨語言。

RPC服務和HTTP服務對比

二、模擬RPC遠端方法呼叫

1、思路

  1. 使用者端通過socket請求伺服器端,並且通過字串形式,將需要的介面傳送給伺服器端(通過動態代理傳送介面名、方法名) 。
  2. 伺服器端將可以提供的介面註冊到服務中心(通過map儲存,key為介面的名字,value為介面的實現類)。
  3. 伺服器端接收到使用者端的請求後,通過請求的介面名在服務中心的map中尋找對應的介面實現類 找到之後,解析剛才使用者端發過來的介面名、方法名,解析完畢後,通過反射技術將該方法執行,執行完畢後,再講該方法的返回值返回給使用者端。

2、分析草圖

​三、程式碼實現

1、介面

package com.guor.rpc.server;

public interface HelloService {
    String sayHello(String name);
}

2、介面實現類 

package com.guor.rpc.server;

public class HelloServiceImpl implements HelloService{
    @Override
    public String sayHello(String name) {
        System.out.println("hello " + name);
        return "hello " + name;
    }
}

3、socket伺服器端(多執行緒、反射)

package com.guor.rpc.server;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 大致思路:
 * 1、角色1:使用者端、角色2:釋出服務的介面(伺服器端)、角色3:服務的註冊中心;
 * 2、伺服器端通過register方法將介面註冊到註冊中心,key為關鍵字,value為介面的具體實現;
 * 3、使用者端與伺服器端通過socket進行通訊,通過反射技術,使用者端傳送一個字串到註冊中心,
獲取註冊中心中map對應的值,即伺服器端介面的一切資訊;
 * 4、使用者端通過動態代理物件接收不同的介面型別
 */
public class ServerCenterImpl implements  ServerCenter{
    //伺服器端的所有可供使用者端存取的介面都註冊到map中
    private static Map<String, Class> serverRegister = new HashMap<String, Class>();
    private static int port;
    public ServerCenterImpl(int port){
        this.port = port;
    }

    //連線池,一個物件處理一個客戶請求(連線池中大小與CPU有關)
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    private static boolean isRunning = false;

    //開啟伺服器端服務
    @Override
    public void start() {
        try {
            ServerSocket serverSocket = new ServerSocket();
            serverSocket.bind(new InetSocketAddress(port));
            isRunning = true;
            while(true){
                System.out.println("伺服器端 start...");
                //接收使用者端請求,處理請求,並返回結果
                //使用者端沒傳送一次請求,則伺服器端從連線池中獲取一個執行緒
                Socket socket = serverSocket.accept();//等待使用者端連線
                executor.execute(new ServiceTask(socket));
            }
        }catch (Exception e){
            System.out.println("start exception." + e);
        }
    }

    @Override
    public void close() {
        isRunning = false;
        executor.shutdown();
    }

    //伺服器端通過register方法將介面註冊到註冊中心,key為關鍵字,value為介面的具體實現;
    @Override
    public void register(Class servie, Class serviceImpl) {
        serverRegister.put(servie.getName(), serviceImpl);
    }

    private static class ServiceTask implements Runnable{
        private Socket socket;

        public ServiceTask(){
        }

        public ServiceTask(Socket socket){
            this.socket = socket;
        }

        @Override
        public void run() {
            ObjectInputStream input = null;
            ObjectOutputStream output = null;
            try{
                //因為ObjectOutputStream對傳送資料的順序嚴格要求,因此需要參照傳送的順序逐個接收
                input = new ObjectInputStream(socket.getInputStream());
                //介面名
                String serviceName = input.readUTF();
                //方法名
                String methodName = input.readUTF();
                //方法引數型別
                Class[] parameterTypes = (Class[])input.readObject();
                //方法引數
                Object[] arguments = (Object[])input.readObject();
                //根據使用者端請求,到map中找到與之對應的具體介面
                Class serviceClass = serverRegister.get(serviceName);
                Method method = serviceClass.getMethod(methodName, parameterTypes);
                Object result = method.invoke(serviceClass.newInstance(), arguments);// = person.say("hello");
                //將方法執行完畢的返回值傳給使用者端
                output = new ObjectOutputStream(socket.getOutputStream());
                output.writeObject(result);
            }catch (Exception e){
                System.out.println("ServerCenterImpl" + e);
            }finally {
                if(output!=null){
                    try{
                        output.close();
                    }catch (Exception e){
                    }
                }
                if(input!=null){
                    try{
                        input.close();
                    }catch (Exception e){
                    }
                }
            }
        }
    }
}

4、socket使用者端(動態代理)

package com.guor.rpc.client;

import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;

//通過socket + 動態代理 + 反射遠端呼叫介面中的方法, 連線池, 並行
public class Client {
    //獲取代表伺服器端介面的動態代理物件(HelloService)
    //serviceInterface:請求的介面名
    //add:待請求伺服器端的ip:port
    public static <T> T getRemoteProxyObj(Class serviceInterface, InetSocketAddress addr){
        System.out.println("Client start...");
        /*
            newProxyInstance(a, b, c);
            a:類載入器:需要代理哪個類,就要獲取哪個類的類載入器
            b:需要代理的物件,具備哪些方法 --> 介面
            java中單繼承、多實現
         */
        return (T) Proxy.newProxyInstance(serviceInterface.getClassLoader(), new Class<?>[]{serviceInterface}, new InvocationHandler() {
            /**
             *
             * @param proxy 代理的物件
             * @param method 哪個方法 sayHello()
             * @param args 參數列
             * @return
             * @throws Throwable
             */
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) {
                ObjectOutputStream output = null;
                ObjectInputStream input = null;
                try {
                    //使用者端向伺服器端傳送請求:請求某一個具體的介面
                    Socket socket = new Socket();
                    socket.connect(new InetSocketAddress(9999));
                    output = new ObjectOutputStream(socket.getOutputStream());
                    //介面名
                    output.writeUTF(serviceInterface.getName());
                    //方法名
                    output.writeUTF(method.getName());
                    //方法引數型別
                    output.writeObject(method.getParameterTypes());
                    //方法引數
                    output.writeObject(args);
                    //接收伺服器端處理後的返回值
                    input = new ObjectInputStream(socket.getInputStream());
                    System.out.println("接收伺服器端處理後的返回值"+input.readObject());
                    return input.read();
                }catch (Exception e) {
                    System.out.println("invoke exception"+e);
                    return null;
                }finally {
                    if(input!=null){
                        try{
                            input.close();
                        }catch (Exception e){
                        }
                    }
                    if(output!=null) {
                        try {
                            output.close();
                        } catch (Exception e) {
                        }
                    }
                }
            }
        });
    }
}

5、測試類

package com.guor.rpc.test;

import com.guor.rpc.server.HelloService;
import com.guor.rpc.server.HelloServiceImpl;
import com.guor.rpc.server.ServerCenter;
import com.guor.rpc.server.ServerCenterImpl;

public class RPCServerTest {
    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                //服務中心
                ServerCenter serverCenter = new ServerCenterImpl(9999);
                //將Hello介面和實現類註冊到服務中心
                serverCenter.register(HelloService.class, HelloServiceImpl.class);
                serverCenter.start();
            }
        }).start();
    }
}
package com.guor.rpc.test;

import com.guor.rpc.client.Client;
import com.guor.rpc.server.HelloService;

import java.net.InetSocketAddress;

/**
 * 1、使用者端通過socket請求伺服器端,並且通過字串形式,將需要的介面傳送給伺服器端(通過動態代理傳送介面名、方法名)
 * 2、伺服器端將可以提供的介面註冊到服務中心(通過map儲存,key為介面的名字,value為介面的實現類)
 * 3、伺服器端接收到使用者端的請求後,通過請求的介面名在服務中心的map中尋找對應的介面實現類
 * 找到之後,解析剛才使用者端發過來的介面名、方法名,解析完畢後,通過反射技術將該方法執行,執行完畢後,再講該方法的返回值返回給使用者端
 */
public class RPCClientTest {
    public static void main(String[] args) throws Exception {
        HelloService service = Client.getRemoteProxyObj(Class.forName("com.guor.rpc.server.HelloService"), new InetSocketAddress("127.0.0.1", 9999));
        service.sayHello("素小暖");
    }
}

6、控制檯輸出

 

往期精彩內容:

Java知識體系總結(2021版)

Java多執行緒基礎知識總結(絕對經典)

超詳細的springBoot學習筆記

常見資料結構與演演算法整理總結

Java設計模式:23種設計模式全面解析(超級詳細)

Java面試題總結(附答案)