監聽商品變更MQ訊息,查詢商品最新的資訊,呼叫BulkProcessor批次更新ES叢集中的商品欄位資訊;
由於商品資料非常多,所以將商品資料儲存到ES叢集上,整個ES叢集共劃分了256個分片,並根據商品的三級類目ID進行分片路由。
比如一個SKU的商品名稱發生變化,我們就會收到這個SKU的變更MQ訊息,然後再去查詢商品介面,將商品的最新名稱查詢回來,再根據這個SKU的三級分類ID進行路由,找到對應的ES叢集分片,然後更新商品名稱欄位資訊。
由於商品變更MQ訊息量巨大,為了提升更新ES的效能,防止出現MQ訊息積壓問題,所以本系統使用了BulkProcessor進行批次非同步更新。
ES使用者端版本如下:
<dependency>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
<version>6.5.3</version>
</dependency>
BulkProcessor設定虛擬碼如下:
//在這裡呼叫build()方法構造bulkProcessor,在底層實際上是用了bulk的非同步操作
this.fullDataBulkProcessor = BulkProcessor.builder((request, bulkListener) ->
fullDataEsClient.getClient().bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener)
// 1000條資料請求執行一次bulk
.setBulkActions(1000)
// 5mb的資料重新整理一次bulk
.setBulkSize(new ByteSizeValue(5L, ByteSizeUnit.MB))
// 並行請求數量, 0不併行, 1並行允許執行
.setConcurrentRequests(1)
// 固定1s必須重新整理一次
.setFlushInterval(TimeValue.timeValueSeconds(1L))
// 重試5次,間隔1s
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 5))
.build();
618大促開始後,由於商品變更MQ訊息非常頻繁,MQ訊息每天的訊息量更是達到了日常的數倍,而且好多商品還變更了三級類目ID;
系統在更新這些三級類目ID發生變化的SKU商品資訊時,根據修改後的三級類目ID路由後的分片更新商品資訊時發生了錯誤,並且重試了5次,依然沒有成功;
因為在新路由的分片上沒有這個商品的索引資訊,這些更新請求永遠也不會執行成功,系統的紀錄檔檔案中也記錄了大量的異常重試紀錄檔。
商品變更MQ訊息也開始出現了積壓報警,MQ訊息的消費速度明顯趕不上生產速度。
觀察MQ訊息消費者的UMP監控資料,發現消費效能很平穩,沒有明顯波動,但是呼叫次數會在系統消費MQ一段時間後出現斷崖式下降,由原來的每分鐘幾萬呼叫量逐漸下降到個位數。
在重啟應用後,系統又開始消費,UMP監控呼叫次數恢復到正常水平,但是系統執行一段時間後,還是會出現消費暫停問題,彷彿所有消費執行緒都被暫停了一樣。
首先找一臺暫停消費MQ訊息的容器,檢視應用程序ID,使用jstack命令dump應用程序的整個執行緒堆疊資訊,將匯出的執行緒堆疊資訊打包上傳到 https://fastthread.io/ 進行執行緒狀態分析。分析報告如下:
通過分析報告發現有124個處於BLOCKED狀態的執行緒,然後可以點選檢視各執行緒的詳細堆疊資訊,堆疊資訊如下:
連續檢視多個執行緒的詳細堆疊資訊,MQ消費執行緒都是在waiting to lock <0x00000005eb781b10> (a org.elasticsearch.action.bulk.BulkProcessor),然後根據0x00000005eb781b10去搜尋發現,這個物件鎖正在被另外一個執行緒佔用,佔用執行緒堆疊資訊如下:
這個執行緒狀態此時正處於WAITING狀態,通過執行緒名稱發現,該執行緒應該是ES使用者端內部執行緒。正是該執行緒搶佔了業務執行緒的鎖,然後又在等待其他條件觸發該執行緒執行,所以導致了所有的MQ消費業務執行緒一直無法獲取BulkProcessor內部的鎖,導致出現了消費暫停問題。
但是這個執行緒elasticsearch[scheduler][T#1]為啥不能執行? 它是什麼時候啟動的? 又有什麼作用?
就需要我們對BulkProcessor進行深入分析,由於BulkProcessor是通過builder模組進行建立的,所以深入builder原始碼,瞭解一下BulkProcessor的建立過程。
public static Builder builder(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, Listener listener) {
Objects.requireNonNull(consumer, "consumer");
Objects.requireNonNull(listener, "listener");
final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = Scheduler.initScheduler(Settings.EMPTY);
return new Builder(consumer, listener,
(delay, executor, command) -> scheduledThreadPoolExecutor.schedule(command, delay.millis(), TimeUnit.MILLISECONDS),
() -> Scheduler.terminate(scheduledThreadPoolExecutor, 10, TimeUnit.SECONDS));
}
內部建立了一個時間排程執行執行緒池,執行緒命名規則和上述持有鎖的執行緒名稱相似,具體程式碼如下:
static ScheduledThreadPoolExecutor initScheduler(Settings settings) {
ScheduledThreadPoolExecutor scheduler = new ScheduledThreadPoolExecutor(1,
EsExecutors.daemonThreadFactory(settings, "scheduler"), new EsAbortPolicy());
scheduler.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
scheduler.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
scheduler.setRemoveOnCancelPolicy(true);
return scheduler;
}
最後在build方法內部執行了BulkProcessor的內部有參構造方法,在構造方法內部啟動了一個週期性執行的flushing任務,程式碼如下
BulkProcessor(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy, Listener listener,
int concurrentRequests, int bulkActions, ByteSizeValue bulkSize, @Nullable TimeValue flushInterval,
Scheduler scheduler, Runnable onClose) {
this.bulkActions = bulkActions;
this.bulkSize = bulkSize.getBytes();
this.bulkRequest = new BulkRequest();
this.scheduler = scheduler;
this.bulkRequestHandler = new BulkRequestHandler(consumer, backoffPolicy, listener, scheduler, concurrentRequests);
// Start period flushing task after everything is setup
this.cancellableFlushTask = startFlushTask(flushInterval, scheduler);
this.onClose = onClose;
}
private Scheduler.Cancellable startFlushTask(TimeValue flushInterval, Scheduler scheduler) {
if (flushInterval == null) {
return new Scheduler.Cancellable() {
@Override
public void cancel() {}
@Override
public boolean isCancelled() {
return true;
}
};
}
final Runnable flushRunnable = scheduler.preserveContext(new Flush());
return scheduler.scheduleWithFixedDelay(flushRunnable, flushInterval, ThreadPool.Names.GENERIC);
}
class Flush implements Runnable {
@Override
public void run() {
synchronized (BulkProcessor.this) {
if (closed) {
return;
}
if (bulkRequest.numberOfActions() == 0) {
return;
}
execute();
}
}
}
通過原始碼發現,該flush任務就是在建立BulkProcessor物件時設定的固定時間flush邏輯,當setFlushInterval方法引數生效,就會啟動一個後臺定時flush任務。flush間隔,由setFlushInterval方法引數定義。該flush任務在執行期間,也會搶佔BulkProcessor物件鎖,搶到鎖後,才會執行execute方法。具體的方法呼叫關係原始碼如下:
/**
* Adds the data from the bytes to be processed by the bulk processor
*/
public synchronized BulkProcessor add(BytesReference data, @Nullable String defaultIndex, @Nullable String defaultType,
@Nullable String defaultPipeline, @Nullable Object payload, XContentType xContentType) throws Exception {
bulkRequest.add(data, defaultIndex, defaultType, null, null, null, defaultPipeline, payload, true, xContentType);
executeIfNeeded();
return this;
}
private void executeIfNeeded() {
ensureOpen();
if (!isOverTheLimit()) {
return;
}
execute();
}
// (currently) needs to be executed under a lock
private void execute() {
final BulkRequest bulkRequest = this.bulkRequest;
final long executionId = executionIdGen.incrementAndGet();
this.bulkRequest = new BulkRequest();
this.bulkRequestHandler.execute(bulkRequest, executionId);
}
而上述程式碼中的add方法,則是由MQ消費業務執行緒去呼叫,在該方法上同樣有一個synchronized關鍵字,所以消費MQ業務執行緒會和flush任務執行執行緒直接會存在鎖競爭關係。具體MQ消費業務執行緒呼叫虛擬碼如下:
@Override
public void upsertCommonSku(CommonSkuEntity commonSkuEntity) {
String source = JsonUtil.toString(commonSkuEntity);
UpdateRequest updateRequest = new UpdateRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
updateRequest.doc(source, XContentType.JSON);
IndexRequest indexRequest = new IndexRequest(Constants.INDEX_NAME_SPU, Constants.INDEX_TYPE, commonSkuEntity.getSkuId().toString());
indexRequest.source(source, XContentType.JSON);
updateRequest.upsert(indexRequest);
updateRequest.routing(commonSkuEntity.getCat3().toString());
fullbulkProcessor.add(updateRequest);
}
通過以上對執行緒堆疊分析,發現所有的業務執行緒都在等待elasticsearch[scheduler][T#1]執行緒釋放BulkProcessor物件鎖,但是該執行緒確一直沒有釋放該物件鎖,從而出現了業務執行緒的死鎖問題。
結合應用紀錄檔檔案中出現的大量異常重試紀錄檔,可能與BulkProcessor的異常重試策略有關,然後進一步瞭解BulkProcessor的異常重試程式碼邏輯。由於業務執行緒中提交BulkRequest請求都統一提交到了BulkRequestHandler物件中的execute方法內部進行處理,程式碼如下:
public final class BulkRequestHandler {
private final Logger logger;
private final BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer;
private final BulkProcessor.Listener listener;
private final Semaphore semaphore;
private final Retry retry;
private final int concurrentRequests;
BulkRequestHandler(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BackoffPolicy backoffPolicy,
BulkProcessor.Listener listener, Scheduler scheduler, int concurrentRequests) {
assert concurrentRequests >= 0;
this.logger = Loggers.getLogger(getClass());
this.consumer = consumer;
this.listener = listener;
this.concurrentRequests = concurrentRequests;
this.retry = new Retry(backoffPolicy, scheduler);
this.semaphore = new Semaphore(concurrentRequests > 0 ? concurrentRequests : 1);
}
public void execute(BulkRequest bulkRequest, long executionId) {
Runnable toRelease = () -> {};
boolean bulkRequestSetupSuccessful = false;
try {
listener.beforeBulk(executionId, bulkRequest);
semaphore.acquire();
toRelease = semaphore::release;
CountDownLatch latch = new CountDownLatch(1);
retry.withBackoff(consumer, bulkRequest, new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse response) {
try {
listener.afterBulk(executionId, bulkRequest, response);
} finally {
semaphore.release();
latch.countDown();
}
}
@Override
public void onFailure(Exception e) {
try {
listener.afterBulk(executionId, bulkRequest, e);
} finally {
semaphore.release();
latch.countDown();
}
}
});
bulkRequestSetupSuccessful = true;
if (concurrentRequests == 0) {
latch.await();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
logger.info(() -> new ParameterizedMessage("Bulk request {} has been cancelled.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("Failed to execute bulk request {}.", executionId), e);
listener.afterBulk(executionId, bulkRequest, e);
} finally {
if (bulkRequestSetupSuccessful == false) { // if we fail on client.bulk() release the semaphore
toRelease.run();
}
}
}
boolean awaitClose(long timeout, TimeUnit unit) throws InterruptedException {
if (semaphore.tryAcquire(this.concurrentRequests, timeout, unit)) {
semaphore.release(this.concurrentRequests);
return true;
}
return false;
}
}
BulkRequestHandler通過構造方法初始化了一個Retry任務物件,該物件中也傳入了一個Scheduler,且該物件和flush任務中傳入的是同一個執行緒池,該執行緒池內部只維護了一個固定執行緒。而execute方法首先會先根據Semaphore來控制並行執行數量,該並行數量在構建BulkProcessor時通過引數指定,通過上述設定發現該值設定為1。所以每次只允許一個執行緒執行該方法。即MQ消費業務執行緒和flush任務執行緒,同一時間只能有一個執行緒可以執行。然後下面在瞭解一下重試任務是如何執行的,具體看如下程式碼:
public void withBackoff(BiConsumer<BulkRequest, ActionListener<BulkResponse>> consumer, BulkRequest bulkRequest,
ActionListener<BulkResponse> listener) {
RetryHandler r = new RetryHandler(backoffPolicy, consumer, listener, scheduler);
r.execute(bulkRequest);
}
RetryHandler內部會執行提交bulkRequest請求,同時也會監聽bulkRequest執行異常狀態,然後執行任務重試邏輯,重試程式碼如下:
private void retry(BulkRequest bulkRequestForRetry) {
assert backoff.hasNext();
TimeValue next = backoff.next();
logger.trace("Retry of bulk request scheduled in {} ms.", next.millis());
Runnable command = scheduler.preserveContext(() -> this.execute(bulkRequestForRetry));
scheduledRequestFuture = scheduler.schedule(next, ThreadPool.Names.SAME, command);
}
RetryHandler將執行失敗的bulk請求重新交給了內部scheduler執行緒池去執行,通過以上程式碼瞭解,該執行緒池內部只維護了一個固定執行緒,同時該執行緒池可能還會被另一個flush任務去佔用執行。所以如果重試邏輯正在執行的時候,此時執行緒池內的唯一執行緒正在執行flush任務,則會阻塞重試邏輯執行,重試邏輯不能執行完成,則不會釋放Semaphore,但是由於並行數量設定的是1,所以flush任務執行緒需要等待其他執行緒釋放一個Semaphore許可後才能繼續執行。所以此處形成了迴圈等待,導致Semaphore和BulkProcessor物件鎖都無法釋放,從而使得所有的MQ消費業務執行緒都阻塞在獲取BulkProcessor鎖之前。
同時,在GitHub的ES使用者端原始碼使用者端上也能搜尋到類似問題,例如: https://github.com/elastic/elasticsearch/issues/47599 ,所以更加印證了之前的猜想,就是因為bulk的不斷重試從而引發了BulkProcessor內部的死鎖問題。
既然前邊已經瞭解到了問題產生的原因,所以就有了如下幾種解決方案:
1.升級ES使用者端版本到7.6正式版,後續版本通過將異常重試任務執行緒池和flush任務執行緒池進行了物理隔離,從而避免了執行緒池的競爭,但是需要考慮版本相容性。
2.由於該死鎖問題是由大量異常重試邏輯引起的,可以在不影響業務邏輯的情況取消重試邏輯,該方案可以不需要升級使用者端版本,但是需要評估業務影響,執行失敗的請求可以通過其他其他方式進行業務重試。
如有疏漏不妥之處,歡迎指正!
作者:京東零售 曹志飛
來源:京東雲開發者社群