大家千萬不要被文章的標題給迷惑了,他兩在本篇文章是沒有關係的, 今天給大家講講最近2個有意思的issue,分享一下我學到的
有個issue說遇到了一個這樣的問題,
這個朋友使用我開源的job排程框架 https://github.com/yuzd/Hangfire.HttpJob
儲存用的是mysql,採用的實現是 https://github.com/arnoldasgudas/Hangfire.MySqlStorage
set表的id是自增主鍵,正常理解 都是慢慢自增上去的,但是發現是大幅度跳躍式的自增, 真相是什麼?
首先針對這個問題,首先我們搞清楚在hangfire中和storage相關的部分如下
Hangfire.Httpjob其實只是依賴了storage api那一層,也沒有能力去直接寫sql去執行, 只能用api去操作hangfire的那幾張表(比如set表)
那麼問題肯定不是在擴充套件層,而是得去看看mysqlstorage的實現原始碼,針對set表的處理邏輯
public override void AddToSet(string key, string value, double score)
{
Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
AcquireSetLock();
QueueCommand(x => x.Execute(
$"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
"VALUES (@Key, @Value, @Score) " +
"ON DUPLICATE KEY UPDATE `Score` = @Score",
new { key, value, score }));
}
這裡是用了ON DUPLICATE KEY UPDATE 的語句
這個語法是在mysql 4.1(2005)引入的,意思是 insert的時候遇到主鍵已存在 就執行後面 的update
但是就是這個功能 會造成自增主鍵成跳躍式增長,增長跨度和SQL的執行次數成正比
根據朋友提供的截圖
雖說是會跳躍,但是這個增長也太誇張了
打上斷點偵錯發現
是hangfire server 不斷的在呼叫,目的是把下一次執行時間(秒級別的時間戳)寫到set表中
打上紀錄檔可以看到有非常多相同值的呼叫,這僅僅是一個job,這個自增速度得再乘以job的個數,難怪了
既然找到原因了,就提個PR 修改下
public override void AddToSet(string key, string value, double score)
{
Logger.TraceFormat("AddToSet key={0} value={1}", key, value);
AcquireSetLock();
QueueCommand(x =>
{
var sql = "";
if (key == "recurring-jobs") // 只發現這個key存在這個問題
{
// key+value是uniq 改成先update 如果沒有成功 再insert
sql = $"UPDATE `{_storageOptions.TablesPrefix}Set` set `Score` = @score where `Key` = @key and `Value` = @value";
var updateRt = x.Execute(sql, new { score = score, key = key, value = value });
if (updateRt < 1)
{
sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
"VALUES (@Key, @Value, @Score) ";
x.Execute(
sql,
new { key, value, score });
}
}
else
{
sql = $"INSERT INTO `{_storageOptions.TablesPrefix}Set` (`Key`, `Value`, `Score`) " +
"VALUES (@Key, @Value, @Score) " +
"ON DUPLICATE KEY UPDATE `Score` = @Score";
x.Execute(
sql,
new { key, value, score });
}
//Console.WriteLine(sql + " ==> " + key + "@" + value + "@" + score);
});
}
改完之後測試,id自增一切正常:
這個原因先說出來: threadpool的執行緒被佔用完後,再來的task會往queue裡面丟,如果這個時候在這個pool的執行緒裡面 future.get()的話會導致task runner(執行器)被堵住,沒人從佇列裡面取任務了~
(簡單來說就是 執行緒在wait future返回,而這個future在queue裡面苦苦等待新釋放的執行緒去執行,就像死鎖一樣,我在等你的結果,而結果在等待著被執行)
好傢伙,這個場景有點熟悉,因為我在專案中也用過Future.get()// 雖說有設定timeout
但是這個問題的重要一點是,這種花式「死鎖」 jvm是檢測不出來的,下面有測試
模擬一下這個場景:
我搞了2個執行緒池,分別是nio執行緒池和業務執行緒池,模擬並行20個請求, 注意看process2方法裡的註釋,如果去掉那裡的程式碼的話 就不會有這個死鎖問題
/**
* @author yuzd
*/
public class PoolTest {
// 模擬nio執行緒池
static ThreadPoolExecutor nioExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
new CustomerNamedThreadFactory("nio", false),
new ThreadPoolExecutor.AbortPolicy());
// 業務執行緒池
static ThreadPoolExecutor buExecutor = new ThreadPoolExecutor(20, 20, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),
new CustomerNamedThreadFactory("bu", true),
new ThreadPoolExecutor.AbortPolicy());
public static void main(String[] args) {
// 模擬是http請求並行20個
IntStream.rangeClosed(1, 20).parallel().forEach((index) -> {
// 交給nio執行緒池處理
nioExecutor.execute(() -> {
try {
httpHandler(index);
} catch (Exception e) {
e.printStackTrace();
}
});
});
}
static void httpHandler(int index) throws ExecutionException, InterruptedException {
System.out.println(Thread.currentThread().getName() + " request index :" + index + " staring");
// 交給業務執行緒池處理
Future<String> parentFuture = buExecutor.submit(() -> process1(index));
String p1Rt = parentFuture.get(); // nio執行緒在wait
System.out.println(Thread.currentThread().getName() + " request index :" + p1Rt + " ending");
}
// future1
static String process1(int index) throws ExecutionException, InterruptedException {
System.out.println( Thread.currentThread().getName() + " process1 index :" + index + " staring");
Future<String> childFuture = buExecutor.submit(() -> process2(index));
String p2Rt = childFuture.get(); // 這裡是bu執行緒在wait 這裡會發生死鎖
System.out.println(Thread.currentThread().getName() + " process1 index :" + index + " ending");
return p2Rt;
}
// future2
static String process2(int index) throws InterruptedException, ExecutionException {
System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " staring");
// 加上就會死鎖
// 只要不一下子產生足夠數量的task(把core全部佔掉)就不會死鎖 加了這裡就會把core全部佔據 導致task進入到queue,core執行緒在wait future.get 無法被釋放, 而queue的任務在等待它釋放產生新的執行緒
Future<String> submit = buExecutor.submit(() -> {
try {
Thread.sleep(1000);
return String.valueOf(index);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
submit.get();
System.out.println(Thread.currentThread().getName() + " process2 index :" + index + " ending");
return String.valueOf(index);
}
}
用visualvm分析執行緒dump,很難直接發現有異常,非同步的很難檢測,排查起來比較複雜,只看到是在wait
用jstack沒有發現deadlock
在實際專案中我也看到過一個專案中共用一個執行緒池,執行緒池被封裝成一個util方法,要執行非同步的都用它,這個場景尤其要注意這個場景,也建議大家用帶有超時的方式 Future.get(xxxx)