RPC是遠端過程呼叫(Remote Procedure Call)的縮寫形式。SAP系統RPC呼叫的原理其實很簡單,有一些類似於三層構架的C/S系統,第三方的客戶程式通過介面呼叫SAP內部的標準或自定義函數,獲得函數返回的資料進行處理後顯示或列印。
隨著微服務、分散式的大熱,開發者慢慢趨向於將一個大的服務拆分成多個獨立的小的服務。
服務經過拆分後,服務與服務之間的通訊就變得至關重要。
RPC說白了就是節點A去呼叫節點B的服務,站在Java的角度看,就是像呼叫本地函數一樣呼叫遠端函數。
要想實現RPC,首先需要解決以下幾個問題:
Socket 網路IO。
Java將物件序列化為位元組陣列通過網路IO傳輸。
JDK動態代理生成代理物件。
在代理物件中發起Socket請求遠端伺服器。
首先看下目錄結構:
消費者發起一個呼叫請求,服務者必須知道你要調哪個服務,引數是什麼,這些需要封裝好。
@Data
public class RpcMessage implements Serializable {
private static final long serialVersionUID = 1L;
private String interfaceName;//呼叫的Service介面名
private String methodName;//呼叫的方法名
private Class<?>[] argsType;//引數型別列表
private Object[] args;//引數
}
分別是服務的提供者和消費者。
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Service//引入Spring Service,自動注入IOC容器
// 服務提供者
public @interface MyRpcService {
}
@Target({ElementType.FIELD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
// 服務消費者
public @interface MyRpcReference {
}
public interface UserService {
// 根據UserId查詢使用者
R<UserResp> findById(Long userId);
}
加上自定義註解@MyRpcService
,後續需要掃描這些實現類,並暴露服務。
@MyRpcService
public class UserServiceImpl implements UserService{
@Override
public R<UserResp> findById(Long userId) {
UserResp userResp = new UserResp();
userResp.setId(userId);
userResp.setName("張三");
userResp.setPwd("root@abc");
return R.ok(userResp);
}
}
應用程式啟動後,從Spring的IOC容器中,找到加了@MyRpcService
註解的服務,並暴露出去。
/**
* @author: pch
* @description: 程式啟動,暴露Service服務
* @date: 2020/10/13
**/
@Component
public class ProviderListener implements ApplicationListener<ApplicationStartedEvent> {
@Override
public void onApplicationEvent(ApplicationStartedEvent applicationStartedEvent) {
ConfigurableApplicationContext context = applicationStartedEvent.getApplicationContext();
for (Object bean : context.getBeansWithAnnotation(MyRpcService.class).values()) {
ProviderHolder.addService(bean);
}
try {
ProviderHolder.start();
} catch (Exception e) {
e.printStackTrace();
}
System.err.println("provider...啟動");
}
}
暴露服務,處理消費者請求的核心程式碼
/**
* @author: pch
* @description: 服務持有者
* @date: 2020/10/13
**/
public class ProviderHolder {
// 快取所有的服務提供者
private static final Map<String, Provider> SERVICES = new ConcurrentHashMap<>();
// 起一個執行緒池,處理消費者的請求
private static final ExecutorService EXECUTOR_SERVICE = Executors.newCachedThreadPool();
// 新增服務
public static void addService(Object bean) {
Class<?> beanClass = bean.getClass();
String interfaceName = beanClass.getInterfaces()[0].getName();
SERVICES.put(interfaceName, new Provider(bean));
}
/**
* 啟動服務
* @throws Exception
*/
public static void start() throws Exception {
if (SERVICES.isEmpty()) {
return;
}
// 開啟ServerSocket,埠3333,監聽消費者發起的請求。
ServerSocket serverSocket = new ServerSocket(3333);
while (true) {
// 當有請求到達,提交一個任務到執行緒池
Socket socket = serverSocket.accept();
EXECUTOR_SERVICE.submit(() -> {
try {
// 從網路IO中讀取消費者傳送的引數
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
if (o instanceof RpcMessage) {
RpcMessage message = (RpcMessage) o;
// 找到消費者要呼叫的服務
Provider provider = SERVICES.get(message.getInterfaceName());
if (provider == null) {
return;
}
// 利用反射呼叫服務
Object result = provider.invoke(message.getMethodName(), message.getArgsType(), message.getArgs());
OutputStream outputStream = socket.getOutputStream();
// 將返回結果序列化為位元組陣列並通過Socket寫回
outputStream.write(ObjectUtil.serialize(result));
outputStream.flush();
}
} catch (Exception e) {
e.printStackTrace();
}
});
}
}
}
/**
* @author: pch
* @description: 基於JDK動態代理生成代理物件,發起RPC呼叫
* @date: 2020/10/13
**/
public class RpcProxy implements InvocationHandler {
private Object origin = new Object();
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if (Object.class.equals(method.getDeclaringClass())) {
return method.invoke(origin, args);
}
// 開啟一個Socket
Socket socket = new Socket("127.0.0.1", 3333);
// 封裝請求協定
RpcMessage message = new RpcMessage();
message.setInterfaceName(method.getDeclaringClass().getName());
message.setMethodName(method.getName());
message.setArgsType(method.getParameterTypes());
message.setArgs(args);
// 將請求引數序列化成位元組陣列通過網路IO寫回
OutputStream outputStream = socket.getOutputStream();
outputStream.write(ObjectUtil.serialize(message));
outputStream.flush();
// 阻塞,等待伺服器端處理完畢返回結果
Object o = new ObjectInputStream(socket.getInputStream()).readObject();
// 返回給呼叫者
return o;
}
}
/**
* @author: pch
* @description: 注入加了@MyRpcReference註解的屬性
* @date: 2020/10/13
**/
@Component
public class RpcBeanPostProcessor implements BeanPostProcessor {
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Class<?> beanClass = bean.getClass();
Field[] fields = ClassUtil.getDeclaredFields(beanClass);
for (Field field : fields) {
if (field.getAnnotation(MyRpcReference.class) == null) {
continue;
}
Object proxy = Proxy.newProxyInstance(beanClass.getClassLoader(), new Class[]{field.getType()}, new RpcProxy());
field.setAccessible(true);
try {
field.set(bean, proxy);
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
return bean;
}
}
核心程式碼寫好了,那就可以開始測試功能是否符合預期了。
1、啟動服務提供者
2、啟動消費者,並行起一個請求
基於篇幅原因,本文只是實現了RPC最基本最簡單的功能,主要是理解RPC的思想。
當然,還有很多可以優化的點: