最近在看工作使用到的diamond設定中心原理,發現大多數設定中心在推和拉模型上做的選擇出奇的一致選擇了基於長輪詢的拉模型
基於拉模型的使用者端輪詢的方案
使用者端通過輪詢方式發現伺服器端的設定變更事件。輪詢的頻率決定了動態設定獲取的實時性。
另外,從設定中心的應用場景上來看,是一種寫少讀多的系統,使用者端大多數輪詢請求都是沒有意義的,因此這種方案不夠高效。
基於推模型的使用者端長輪詢的方案
基於Http長輪詢模型,實現了讓使用者端在沒有發生動態設定變更的時候減少輪詢。這樣減少了無意義的輪詢請求量,提高了輪詢的效率;也降低了系統負載,提升了整個系統的資源利用率。
長輪詢
本質上是原始輪詢技術的一種更有效的形式。
它的出現是為了解決:向伺服器傳送重複請求會浪費資源,因為必須為每個新傳入的請求建立連線,必須解析請求的 HTTP 頭部,必須執行對新資料的查詢,並且必須生成和交付響應(通常不提供新資料)然後必須關閉連線並清除所有資源。
設定未修改(設定中心沒有修改設定,使用者端快取的設定和設定中心一致,所以是白忙活)
長輪詢是一種伺服器選擇儘可能長的時間保持和使用者端連線開啟的技術
,僅在資料變得可用或達到超時闕值後才提供響應
,而不是在給到使用者端的新資料可用之前,讓每個使用者端多次發起重複的請求
簡而言之,就是伺服器端並不是立馬寫回響應,而是hold住一段時間,如果這段時間有資料需要寫回(例如設定的修改,新設定需要寫回)再寫回,然後瀏覽器再傳送一個新請求,從而實現及時性
,節省網路開銷
的作用。
package com.cuzzz.springbootlearn.longpull;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
@RestController
@RequestMapping("long-pull")
public class MyController implements InitializingBean {
/**
* 處理任務的執行緒
*/
private ThreadPoolExecutor processExecutor;
/**
* 等待喚醒的鎖
*/
private static final ReentrantLock lock = new ReentrantLock();
/**
* 當請求獲取設定的時候,在此condition上等待一定時間
* 當修改設定的時候通過這個condition 通知其他獲取設定的執行緒
*/
private static final Condition condition = lock.newCondition();
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response) throws ExecutionException, InterruptedException {
//組轉成任務
Task<String> task = new Task<String>(request, response,
() -> "拿設定" + System.currentTimeMillis());
//提交到執行緒池
Future<?> submit = processExecutor.submit(task);
//tomcat執行緒阻塞於此
submit.get();
}
/**
* 模擬修改設定
*
* 喚醒其他獲取設定的執行緒
*/
@PostMapping
public String post(HttpServletRequest request, HttpServletResponse response) {
lock.lock();
try {
condition.signalAll();
}finally {
lock.unlock();
}
return "OK";
}
static class Task<T> implements Runnable {
private HttpServletResponse response;
/**
* 等待時長
*/
private final long timeout;
private Callable<T> task;
public Task(HttpServletRequest request, HttpServletResponse response, Callable<T> task) {
this.response = response;
String time = request.getHeader("time-out");
if (time == null){
//預設等待10秒
this.timeout = 10;
}else {
this.timeout = Long.parseLong(time);
}
this.task = task;
}
@Override
public void run() {
lock.lock();
try {
//超市等待
boolean await = condition.await(timeout, TimeUnit.SECONDS);
//超時
if (!await) {
throw new TimeoutException();
}
//獲取設定
T call = task.call();
//寫回
ServletOutputStream outputStream = response.getOutputStream();
outputStream.write(("沒超時拿當前設定:" + call).getBytes(StandardCharsets.UTF_8));
} catch (TimeoutException | InterruptedException exception) {
//超時或者執行緒被中斷
try {
ServletOutputStream outputStream = response.getOutputStream();
T call = task.call();
outputStream.write(("超時or中斷拿設定:" + call).getBytes(StandardCharsets.UTF_8));
} catch (Exception ex) {
throw new RuntimeException(ex);
}
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
@Override
public void afterPropertiesSet() {
int cpuNums = Runtime.getRuntime().availableProcessors();
processExecutor
= new ThreadPoolExecutor(cpuNums, cpuNums * 2, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100), new ThreadPoolExecutor.CallerRunsPolicy());
}
}
使用get方法反問的請求回被提交到執行緒池進行await等待,使用post方法的請求回喚醒這些執行緒。
但是這個寫法有點脫褲子放屁
為什麼會出現這種情況,直接提交到執行緒池非同步執行不可以麼,加入我們刪除上面submit.get
方法會發現其實什麼結果都不會,這是因為非同步提交到執行緒池後,tomcat已經結束了這次請求,並沒有維護這個連線,所以沒有辦法寫回結果。
如果不刪除這一行,tomcat執行緒阻塞住我們可以寫回結果,但是其實沒有達到設定使用長輪詢的初衷——"解放tomcat執行緒,讓設定中心伺服器端可以處理更多請求"。
所以我們現在陷入一個尷尬的境地,怎麼解決暱?看下去
package com.cuzzz.springbootlearn.longpull;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
@RestController
@RequestMapping("long-pull3")
public class MyController2 {
private static final ScheduledExecutorService procesExecutor
= Executors.newSingleThreadScheduledExecutor();
/**
* 記錄設定改變的map
*/
private static final ConcurrentHashMap<String, String> configCache
= new ConcurrentHashMap<>();
/**
* 記錄長輪詢的任務
*/
private static final ConcurrentLinkedDeque<AsyncTask> interestQueue
= new ConcurrentLinkedDeque<>();
static {
//每2秒看一下釋放設定變更,或者任務超時
procesExecutor.scheduleWithFixedDelay(() -> {
List<AsyncTask>needRemove = new ArrayList<>();
for (AsyncTask asyncTask : interestQueue) {
if (asyncTask.timeout()) {
asyncTask.run();
needRemove.add(asyncTask);
continue;
}
if (configCache.containsKey(asyncTask.configId)) {
needRemove.add(asyncTask);
asyncTask.run();
}
}
interestQueue.removeAll(needRemove);
}, 1, 2, TimeUnit.SECONDS);
}
static class AsyncTask implements Runnable {
private final AsyncContext asyncContext;
private final long timeout;
private static long startTime;
private String configId;
AsyncTask(AsyncContext asyncContext) {
this.asyncContext = asyncContext;
HttpServletRequest request = (HttpServletRequest) asyncContext.getRequest();
String timeStr = request.getHeader("time-out");
if (timeStr == null) {
timeout = 10;
} else {
timeout = Long.parseLong(timeStr);
}
//關注的設定key,應該getParameter的,無所謂
this.configId = request.getHeader("config-id");
if (this.configId == null) {
this.configId = "default";
}
//開始時間
startTime = System.currentTimeMillis();
}
//是否超時
public boolean timeout() {
return (System.currentTimeMillis() - startTime) / 1000 > timeout;
}
@Override
public void run() {
String result = "開始於" + System.currentTimeMillis() + "--";
try {
if (timeout()) {
result = "超時: " + result;
} else {
result += configCache.get(this.configId);
}
result += "--結束於:" + System.currentTimeMillis();
ServletResponse response = asyncContext.getResponse();
response.getOutputStream().write(result.getBytes(StandardCharsets.UTF_8));
//後續將交給tomcat執行緒池處理,將給使用者端響應
asyncContext.complete();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
@GetMapping
public void get(HttpServletRequest request, HttpServletResponse response) {
//列印處理的tomcate執行緒id
System.out.println("執行緒id" + Thread.currentThread().getId());
//新增一個獲取設定的非同步任務
interestQueue.add(new AsyncTask(asyncContext));
//開啟非同步
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
//監聽器列印最後回撥的tomcat執行緒id
asyncContext.addListener(new AsyncListener() {
@Override
public void onComplete(AsyncEvent event) throws IOException {
System.out.println("執行緒id" + Thread.currentThread().getId());
}
//...剩餘其他方法
});
//立馬就會釋放tomcat執行緒池資源
System.out.println("tomcat主執行緒釋放");
}
@PostMapping
public void post(HttpServletRequest request) {
String c = String.valueOf(request.getParameter("config-id"));
if (c.equals("null")){
c = "default";
}
String v = String.valueOf(request.getParameter("value"));
configCache.put(c, v);
}
}
上面演示利用AsyncContext
tomcat是如何實現長輪詢
這種方式的優勢在於:解放了tomcat執行緒,其實tomcat的執行緒只是執行了get方法中的程式碼,然後立馬可以去其他請求,真正獲取設定更改
的是我們的單執行緒定時2秒去輪詢。
Connector是使用者端連線到Tomcat容器的服務點,它提供協定服務來將引擎與使用者端各種協定隔離開來
在Connector元件中建立了Http11NioProtocol元件,Http11NioProtocol預設持有NioEndpoin,NioEndpoint中持有Acceptor和Poller,並且啟動的時候會啟動一個執行緒執行Acceptor
Acceptor伺服器端監聽使用者端的連線,會啟動執行緒一直執行
每接收一個使用者端連線就輪詢一個Poller元件,新增到Poller元件的事件佇列中。,每接收一個使用者端連線就輪詢一個Poller元件,新增到Poller元件的事件佇列中。
Poller元件持有多路複用器selector,poller元件不停從自身的事件佇列中將事件取出註冊到自身的多路複用器上,同時多路複用器會不停的輪詢檢查是否有通道準備就緒,準備就緒的通道就可以扔給tomcat執行緒池處理了。
tomcat執行緒池處理請求
這裡會根據協定建立不同的Processor處理,這裡建立的是Http11Processor,Http11Processor會使用CoyoteAdapter去解析報文隨後交給Container去處理請求
CoyoteAdapter解析報文隨後交給Container去處理請求
Container會將Filter和Servlet組裝成FilterChain依次呼叫
FilterChain會依次呼叫Filter#doFilter,然後呼叫Servlet#service方法
至此會呼叫到Servlete#service方法,SpringMVC中的Dispatcher會反射呼叫我們controller的方法
AsycContext內部持有一個AsyncStateMachine來管理非同步請求的狀態(有點狀態模式的意思)
狀態機的初始狀態是AsyncState.DISPATCHED,通過setStarted將狀態機的狀態更新成STARTING
Connector啟動的時候觸發ProtocolHandler的start方法,如下
其中startAsyncTimeout方法會遍歷waitingProcessors中每一個Processor的timeoutAsync方法,這裡的Processor就是Http11Processor
那麼waitProcessors中的Http11Processor是誰塞進去的暱?
tomcat執行緒在執行完我們的Servlet程式碼後,Http11NioProtocol會判斷請求狀態,如果為Long那麼會塞到waitProcessors集合中。
如果發現請求超時,那麼會呼叫Http11Processor#doTimeoutAsycn
然後由封裝的socket通道socketWrapper以TIMEOUT的事件型別重新提交到tomcat執行緒池中。
可以看到其實和超時一樣,只不過超時是由定時任務執行緒輪詢來判斷,而AsyncContext#complete則是我們業務執行緒觸發processSocketEvent將後續處理提交到tomcat執行緒池中。
本文學習了長輪詢和tomcat長輪詢的原理,可以看到這種方式的優點
當然這種方式也是有缺點的
hold住請求也是會消耗資源的,如果1w個請求同時到來,我們都需要hold住(封裝成任務塞到佇列)這寫任務也是會佔用記憶體的,而短輪詢則會立馬返回,從而時間資源的釋放
請求先後順序無法保證,比如輪詢第五個使用者端的請求的時候,出現了設定的變更,這時候第五個請求會被提交到tomcat執行緒池中,從而早於前面四個請求得到響應,這對於需要嚴格有序的業務場景是有影響的
多臺範例監聽設定中心範例,出現不一致的情況
比如設定中心四臺範例監聽設定變更,前三臺可能響應了得到V1的設定,但是輪詢到第四臺範例的請求的時候又發生了變更可能就得到了v2的設定,這時候這四臺設定不一致了。需要保證這種一致性需要我們採取其他的策略,比如設定中心伺服器端主動udp推,或者加上版本號保證這四臺設定一致。