攤牌了,我要手寫一個RPC

2020-10-19 12:00:13

前言

RPC是遠端過程呼叫(Remote Procedure Call)的縮寫形式。SAP系統RPC呼叫的原理其實很簡單,有一些類似於三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函數,獲得函數返回的資料進行處理後顯示或列印。

隨著微服務、分散式的大熱,開發者慢慢趨向於將一個大的服務拆分成多個獨立的小的服務。
服務經過拆分後,服務與服務之間的通訊就變得至關重要。

RPC說白了就是節點A去呼叫節點B的服務,站在Java的角度看,就是像呼叫本地函數一樣呼叫遠端函數


需要解決的問題

要想實現RPC,首先需要解決以下幾個問題:

  1. 服務之間如何通訊?
    Socket 網路IO。
  2. 請求引數、返回結果如何傳輸?
    Java將物件序列化為位元組陣列通過網路IO傳輸。
  3. 介面沒有實現類,該如何呼叫?
    JDK動態代理生成代理物件。
  4. 如何發起遠端呼叫?
    在代理物件中發起Socket請求遠端伺服器。

手寫RPC實戰

首先看下目錄結構:
在這裡插入圖片描述

1、定義通訊協定

消費者發起一個呼叫請求,服務者必須知道你要調哪個服務,引數是什麼,這些需要封裝好。

@Data
public class RpcMessage implements Serializable {
	private static final long serialVersionUID = 1L;

	private String interfaceName;//呼叫的Service介面名
	private String methodName;//呼叫的方法名
	private Class<?>[] argsType;//引數型別列表
	private Object[] args;//引數
}

2、自定義註解

分別是服務的提供者和消費者。

@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自動注入IOC容器
// 服務提供者
public @interface MyRpcService {

}


@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服務消費者
public @interface MyRpcReference {

}

3、定義介面

public interface UserService {

	// 根據UserId查詢使用者
	R<UserResp> findById(Long userId);
}

4、實現介面

加上自定義註解@MyRpcService,後續需要掃描這些實現類,並暴露服務。

@MyRpcService
public class UserServiceImpl implements UserService{

	@Override
	public R<UserResp> findById(Long userId) {
		UserResp userResp = new UserResp();
		userResp.setId(userId);
		userResp.setName("張三");
		userResp.setPwd("root@abc");
		return R.ok(userResp);
	}
}

5、暴露服務並監聽處理請求

應用程式啟動後,從Spring的IOC容器中,找到加了@MyRpcService註解的服務,並暴露出去。

/**
 * @author: pch
 * @description: 程式啟動,暴露Service服務
 * @date: 2020/10/13
 **/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {

	@Override
	public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
		ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
		for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
			ProviderHolder.addService(bean);
		}
		try {
			ProviderHolder.start();
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.err.println("provider...啟動");
	}
}

暴露服務,處理消費者請求的核心程式碼

/**
 * @author: pch
 * @description: 服務持有者
 * @date: 2020/10/13
 **/
public class ProviderHolder {
	// 快取所有的服務提供者
	private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
	// 起一個執行緒池,處理消費者的請求
	private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();

	// 新增服務
	public static void addService(Object bean) {
		Class<?> beanClass = bean.getClass();
		String interfaceName = beanClass.getInterfaces()[0].getName();
		SERVICES.put(interfaceName, new Provider(bean));
	}

	/**
	 * 啟動服務
	 * @throws Exception
	 */
	public static void start() throws Exception {
		if (SERVICES.isEmpty()) {
			return;
		}
		// 開啟ServerSocket,埠3333,監聽消費者發起的請求。
		ServerSocket serverSocket = new ServerSocket(3333);
		while (true) {
			// 當有請求到達,提交一個任務到執行緒池
			Socket socket = serverSocket.accept();
			EXECUTOR_SERVICE.submit(() -> {
				try {
					// 從網路IO中讀取消費者傳送的引數
					Object o = new ObjectInputStream(socket.getInputStream()).readObject();
					if (o instanceof RpcMessage) {
						RpcMessage message = (RpcMessage) o;
						// 找到消費者要呼叫的服務
						Provider provider = SERVICES.get(message.getInterfaceName());
						if (provider == null) {
							return;
						}
						// 利用反射呼叫服務
						Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
						OutputStream outputStream = socket.getOutputStream();
						// 將返回結果序列化為位元組陣列並通過Socket寫回
						outputStream.write(ObjectUtil.serialize(result));
						outputStream.flush();
					}
				} catch (Exception e) {
					e.printStackTrace();
				}
			});
		}
	}
}

6、生成RPC動態代理物件

/**
 * @author: pch
 * @description: 基於JDK動態代理生成代理物件,發起RPC呼叫
 * @date: 2020/10/13
 **/
public class RpcProxy implements InvocationHandler {
	private Object origin = new Object();

	@Override
	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
		if (Object.class.equals(method.getDeclaringClass())) {
			return method.invoke(origin, args);
		}
		// 開啟一個Socket
		Socket socket = new Socket("127.0.0.1", 3333);
		// 封裝請求協定
		RpcMessage message = new RpcMessage();
		message.setInterfaceName(method.getDeclaringClass().getName());
		message.setMethodName(method.getName());
		message.setArgsType(method.getParameterTypes());
		message.setArgs(args);
		// 將請求引數序列化成位元組陣列通過網路IO寫回
		OutputStream outputStream = socket.getOutputStream();
		outputStream.write(ObjectUtil.serialize(message));
		outputStream.flush();
		// 阻塞,等待伺服器端處理完畢返回結果
		Object o = new ObjectInputStream(socket.getInputStream()).readObject();
		// 返回給呼叫者
		return o;
	}
}

7、消費者注入RPC動態代理物件

/**
 * @author: pch
 * @description: 注入加了@MyRpcReference註解的屬性
 * @date: 2020/10/13
 **/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		Class<?> beanClass = bean.getClass();
		Field[] fields = ClassUtil.getDeclaredFields(beanClass);
		for (Field field : fields) {
			if (field.getAnnotation(MyRpcReference.class) == null) {
				continue;
			}
			Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
			field.setAccessible(true);
			try {
				field.set(bean, proxy);
			} catch (IllegalAccessException e) {
				e.printStackTrace();
			}
		}
		return bean;
	}
}

功能測試

核心程式碼寫好了,那就可以開始測試功能是否符合預期了。

1、啟動服務提供者
在這裡插入圖片描述
2、啟動消費者,並行起一個請求
在這裡插入圖片描述

尾巴

基於篇幅原因,本文只是實現了RPC最基本最簡單的功能,主要是理解RPC的思想。
當然,還有很多可以優化的點:

  1. Service暴露的所有方法快取起來,每次呼叫再反射查詢開銷還是很大的。
  2. 使用Netty提升網路IO的通訊效能。
  3. 連線池的引入。
  4. 註冊中心的加入。
  5. 寫回的資料沒有包裝協定。
  6. 資料格式的擴充套件,請求頭的加入。