本文探討的是在tomcat伺服器端介面程式設計中, 非同步servlet場景下( 參考我另外一個文章),用rxjava來改造介面為全流程非同步方式
好處不用說
但是缺點也沒法逃避
解決這些缺點,在其他語言上有
實現上有的是語法層面,有的是語法糖(編譯成狀態機),拋開機制不同,他們都是為了解決了一個關鍵問題:
那麼java咋辦,作為同時jvm語言的kotlin的Coroutine(協程)可以幫到我們!
回到剛開頭說的探討場景,可能有人會覺得奇怪,如果用kotlin的話,有kotlin方式的伺服器端非同步程式設計框架啊,比如ktor。或者spring webflux + kotlin suspend等 沒錯,建議都採用這種方式最好! 那在源頭上就是非上面的,我們又如何利用kotlin的協程,是今天主要討論的話題!
這裡舉例下分銷訂單介面, 不同的分銷商都得call一次,call完後還要根據結果來做別的操作(A和B)。 假設有5個分銷商 因為每個分銷商之間沒有依賴,所以優化方式自然想到用rxjava來改造!
要想在tomcat容器裡實現全流程非同步, 那肯定是用非同步servlet的方式,如上圖所示,tomcat的nio執行緒呼叫業務介面返回ListenableFuture, 會呼叫addListener設定一個callback,在callback裡面進行非同步上下文的提交
//非同步servlet標準式操作
final AsyncContext asyncContext = request.startAsync();
final ListenableFuture<?> responseFuture = distributorsOrder();//業務方法
responseFuture.addListener(() -> {
try {
// 略
} catch (Throwable ex) {
_logger.error("Execute async context error!", t);
} finally {
asyncContext.complete();
}
}, executorService);
用rxjava的實現方式(示意虛擬碼)
private Single<Optional<List<String>>> createByAsync(Detail orderItem) {
List<Single<Optional<List<String>>>> singleOptList = new ArrayList<>();
for (List<Distributor> distributor : distributorList) {
Single<Optional<List<String>>> orderId = distributor
.createOrderAsync(orderItem);
singleOptList.add(orderId);
}
return Single.zip(singleOptList, objects -> {
//回撥處理略
return Optional.of(result);
});
}
Single<Optional<List<String>>> createDistributorOrderSingle = createByAsync(orderItem);
createDistributorOrderSingle.flatMap( (Function<Optional<List<String>>, SingleSource<List<ResultEntity>>>) objects -> {
Single<Optional<List<ActionAResult>>> actionASingle = getActionABySoaAsync(objects);
Single<Optional<List<ActionBResult>>> actionASingle = getActionBBySoaAsync(objects);
return Single.zip(actionASingle, actionASingle, (actionATypes, actionBTypes) -> {
// 回撥處理略
return resultEntity;
});
});
可能你第一次寫完,儘管看起來很複雜,但是一看95線明顯降低,是不是覺得還有點成就感呢, 後面業務變得複雜,繼續疊加callback, 排查報錯,一堆函數式鏈路,是不是覺得很難受。 好吧,這個專案重構代價太大了,那麼後面你在寫一個新業務的時候,你會還想要這麼寫嗎? 有沒有別的剛好的方式呢?
一般我們都微服務化,基本上呼叫都是通過微服務架構方式呼叫,微服務架構層一般會提供代理類來封裝。 那麼我們就可以通過包裝代理類來實現kotlin的協程呼叫方式(靈感來自retrofit)
在設計這個功能的時候,我首先會想,暴露出來的使用方式怎麼樣是友好的,包括寫單元測試。 那就是面向介面封裝
interface SoaClientInterface {
suspend fun soaMethod1(request: GetMethod1RequestType): GetMethod1ResponseType
}
@RunWith(SpringRunner::class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.NONE)
class SoaClientTest {
@SoaClass
private lateinit var soaClients: SoaClientInterface
@Test
fun test() = runBlocking {
val resaponse = soaClients.soaMethod1(request)
}
}
如上,我要呼叫的微服務方法 soaMethod1 (suspend方法) 我把他定義到一個interface裡面,然後我在使用的時候只需要打上一個註解@SoaClass 在使用的時候就直接用就可以了。
這樣一來, soaMethod1 原本是返回ListenableFuture 被我包裝成一個代理類,代理類返回的是Coroutine 藉助suspend語法糖,內部會幫我們自動切換上下文。
是我自定義的spring BeanPostProcessor 處理標識, 在spring容器的流程中,會發掘打了這個註解的field並注入我自定義的介面實現類!
我的介面實現類的目的是為了包裝ListenableFuture為suspend的Coroutine方式呼叫
這裡用jdk的proxy功能建立代理類,當呼叫代理類的任何方法,都會走到這裡
public <T> T create(final Class<T> service, ISoaFactory soaFactory) throws Exception {
validateServiceInterface(service, soaFactory);
return (T) Proxy.newProxyInstance(service.getClassLoader(), new Class<?>[]{service}, new InvocationHandler() {
private final Object[] emptyArgs = new Object[0];
@Override
public @Nullable Object invoke(Object proxy, Method method, @Nullable Object[] args) throws Throwable {
// If the method is a method from Object then defer to normal invocation.
if (method.getDeclaringClass() == Object.class) {
return method.invoke(this, args);
}
args = args != null ? args : emptyArgs;
return method.isDefault() ?
invokeDefaultMethod(method, service, proxy, args) :
loadServiceMethod(method, soaFactory).invoke(args);
}
});
}
代理介面定義的每個方法都會解析成一個SoaServiceMethod<?>,快取起來下次呼叫
SoaServiceMethod<?> loadServiceMethod(Method method, ISoaFactory soaFactory) throws Exception {
SoaServiceMethod<?> result = serviceMethodCache.get(method);
if (result != null) {
return result;
}
synchronized (serviceMethodCache) {
result = serviceMethodCache.get(method);
if (result == null) {
result = SoaServiceMethod.parseAnnotations(method, soaFactory);
serviceMethodCache.put(method, result);
}
}
return result;
}
每個方法需要去解析且拿到以下資訊
SoaRequestFactory build() {
int parameterCount = parameterAnnotationsArray.length;
if (parameterCount > 2 || parameterCount < 1) {
throw new IllegalArgumentException("Method request parameterCount invalid"
+ "\n for method "
+ method.getDeclaringClass().getSimpleName()
+ "."
+ method.getName());
}
try {
if (TypeUtils.getRawType(parameterTypes[parameterTypes.length - 1]) == Continuation.class) {
isKotlinSuspendFunction = true;
}
} catch (NoClassDefFoundError ignored) {
// Ignored
}
if (!isKotlinSuspendFunction && parameterCount > 1) {
throw new IllegalArgumentException("Method request parameterCount invalid"
+ "\n for method "
+ method.getDeclaringClass().getSimpleName()
+ "."
+ method.getName());
}
Type returnType = method.getGenericReturnType();
if (hasUnresolvableType(returnType)) {
throw new IllegalArgumentException(String.format("Method return type must not include a type variable or wildcard: %s", returnType)
+ "\n for method "
+ method.getDeclaringClass().getSimpleName()
+ "."
+ method.getName());
}
if (returnType == void.class) {
throw new IllegalArgumentException("Service methods cannot return void."
+ "\n for method "
+ method.getDeclaringClass().getSimpleName()
+ "."
+ method.getName());
}
// 返回型別
Type adapterType;
if (isKotlinSuspendFunction) {
adapterType =
TypeUtils.getParameterLowerBound(
0, (ParameterizedType) parameterTypes[parameterTypes.length - 1]);
if (TypeUtils.getRawType(adapterType) == AsyncResult.class && adapterType instanceof ParameterizedType) {
adapterType = TypeUtils.getParameterUpperBound(0, (ParameterizedType) adapterType);
continuationWantsResponse = true;
}
continuationIsUnit = isUnit(adapterType);
} else {
adapterType = returnType;
}
this.requestType = method.getParameterTypes()[0];
this.responseType = (Class<?>) adapterType;
this.methodName = method.getName();
return new SoaRequestFactory(this);
}
如果是kotlin的suspend方式 那麼需要在java裡面直接呼叫kotlin寫的擴充套件方法
@Override
Object invoke(Object[] args) {
Continuation<ResponseT> continuation = (Continuation<ResponseT>) args[args.length - 1];
try {
return SoaExtendKotlinKt.await(soaClient, args[0], continuation);
} catch (Exception e) {
return SoaExtendKotlinKt.suspendAndThrow(e, continuation);
}
}
這裡是最核心的實現方式 ListenableFuture -> suspend func
suspend fun <T : Any, K : Any> SoaClient<T, K>.await(request: T): K? {
return suspendCancellableCoroutine { continuation ->
continuation.invokeOnCancellation {
this.cancel()
}
Futures.addCallback(
this.handleAsync(request),
CatAsync.wrap(object : FutureCallback<K> {
override fun onSuccess(result: K?) {
continuation.resume(result)
}
override fun onFailure(t: Throwable) {
continuation.resumeWithException(t)
}
}), ThreadPool.INSTANCE
)
}
}
只要思路定下來,技術細節實現就很簡單了。 那麼這麼一包裝,用的時候的好處怎麼體現出來呢?我們把上面用rxjava的實現的虛擬碼換成kotlin方式的虛擬碼
interface SoaClientInterface {
suspend fun createOrderAsync(request: CreateOrderRequestType): CreateOrderResponseType
}
@SoaClass
private lateinit var soaClients: SoaClientInterface
suspend func createDistributorsOrder(request:createRequestType)=coroutineScope{
val channel = Channel<List<User>>()
for (distributor in distributorList) {
launch {
// 並行呼叫
val users = soaClients.createOrderAsync(CreateOrderRequestType().also{
it.orderItem = request.orderItem
it.distributorId = distributor.id
})
.also { log(repo, it) }
.bodyList()
channel.send(users)
}
}
repeat(distributorList.size) {
val rt = channel.receive()
//處理其他 suspend
}
}
採用了協程Coroutine的方式解決了非同步回撥,如果有報錯也非常清楚(歸功於kotlin的Coroutine的功能強大) 其中最難的是依賴對方提供的方法返回的是ListenableFuture 如何包裝成 suspend func 來達到整體的suspend一路到底的全鏈路非同步方式~!