本篇其實是承接前面兩篇的,都是講定位線上的c3p0資料庫連線池,發生連線洩露的問題。
第二篇講到,可以設定兩個引數,來找出是哪裡的程式碼借了連線後沒有歸還。但是,在我這邊的情況是,對於沒有歸還的連線,借用者的堆疊確實是列印到紀錄檔了,但是我在本地模擬的時候,發現其實這些場景是有歸還連線的,所以,我開始懷疑不是程式碼問題。
不是業務程式碼問題,能是啥問題呢?我們先來看看連線是怎麼歸還到連線池的。
我在本地debug了下,發現獲取連線時,程式碼如下:
com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource#getConnection()
public Connection getConnection() throws SQLException
{
// javax.sql.PooledConnection,實際型別為com.mchange.v2.c3p0.impl.NewPooledConnection
PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
return pc.getConnection();
}
說實話,之前都沒注意到jdbc api裡還有javax.sql.PooledConnection
這個類,這裡,就是首先從c3p0連線池獲取了一個com.mchange.v2.c3p0.impl.NewPooledConnection
物件,然後轉換為javax.sql.PooledConnection
。
然後,呼叫javax.sql.PooledConnection#getConnection
,會返回給實際型別為com.mchange.v2.c3p0.impl.NewProxyConnection
的物件。
com.mchange.v2.c3p0.impl.NewPooledConnection#getConnection
public synchronized Connection getConnection() throws SQLException
{
if ( exposedProxy == null )
{
exposedProxy = new NewProxyConnection( physicalConnection, this );
}
return exposedProxy;
}
在該類中,主要包含如下幾個欄位:
inner:實際的底層連線,如我這裡,其型別為oracle.jdbc.driver.T4CConnection
parentPooledConnection:javax.sql.PooledConnection型別的池化連線
cel:型別為ConnectionEventListener,就是一個監聽器
com.mchange.v2.c3p0.impl.NewProxyConnection
public synchronized void close() throws SQLException {
// 0
if (!this.isDetached()) {
// 1
NewPooledConnection npc = this.parentPooledConnection;
this.detach();
// 2
npc.markClosedProxyConnection(this, this.txn_known_resolved);
this.inner = null;
}
}
0處,檢查該物件是否已經和底層的池化連線解綁:
boolean isDetached() {
return this.parentPooledConnection == null;
}
1處,通過parentPooledConnection
獲取到NewPooledConnection
型別的池化連線,然後和池化連線解綁:
private void detach() {
this.parentPooledConnection.removeConnectionEventListener(this.cel);
this.parentPooledConnection = null;
}
2處,呼叫池化連線的方法,進行清理:
void markClosedProxyConnection( NewProxyConnection npc, boolean txn_known_resolved )
{
// 2.1
List closeExceptions = new LinkedList();
// 2.2
cleanupResultSets( closeExceptions );
cleanupUncachedStatements( closeExceptions );
checkinAllCachedStatements( closeExceptions );
// 2.3
if ( closeExceptions.size() > 0 )
{
...
// 列印異常
}
reset( txn_known_resolved );
exposedProxy = null; //volatile
// 2.4
fireConnectionClosed();
}
2.1處,建個list,用來收集清理過程中的各種異常;
2.2處,清理ResultSet、Statement等
2.3處,列印異常
2.4處,通知監聽者:
private void fireConnectionClosed()
{
ces.fireConnectionClosed();
}
然後進入:
ConnectionEvent evt = new ConnectionEvent(source);
for (Iterator i = mlCopy.iterator(); i.hasNext();)
{
ConnectionEventListener cl = (ConnectionEventListener) i.next();
// 1 呼叫listener的方法
cl.connectionClosed(evt);
}
// com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool.ConnectionEventListenerImpl#connectionClosed
public void connectionClosed(final ConnectionEvent evt)
{
doCheckinResource( evt );
}
然後如下方法被呼叫:
private void doCheckinResource(ConnectionEvent evt)
{
// rp: com.mchange.v2.resourcepool.BasicResourcePool
rp.checkinResource( evt.getSource() );
}
這裡rp就是資源池,這裡就會向資源池歸還連線。
內部的實現如下:
這裡是定義了一個內部類RefurbishCheckinResourceTask
,內部類實現了Runnable,然後new了一個範例,丟給了taskRunner,進行非同步歸還。
這個task的邏輯:
class RefurbishCheckinResourceTask implements Runnable
{
public void run()
{
// 1 檢查資源是否ok
boolean resc_okay = attemptRefurbishResourceOnCheckin( resc );
synchronized( BasicResourcePool.this )
{
PunchCard card = (PunchCard) managed.get( resc );
// 2 如果資源ok,歸還到unused空閒連結串列,更新卡片
if ( resc_okay && card != null)
{
// 2.1 歸還到unused空閒連結串列
unused.add(0, resc );
// 2.2 更新卡片的歸還時間為當前時間、借出時間為-1,表示未借出
card.last_checkin_time = System.currentTimeMillis();
card.checkout_time = -1;
}
else
{
if (card != null)
card.checkout_time = -1;
// 連線是壞的,那就把這個連線毀滅
removeResource( resc );
ensureMinResources();
}
BasicResourcePool.this.notifyAll();
}
}
}
這裡歸還連線,可以看到,是new了一個runnable,丟給執行緒池去非同步執行,但是,非同步執行,不是很穩啊,比如,如果此時執行緒池裡的執行緒,都卡住了,沒法處理task,怎麼辦呢?
如果你去搜尋引擎查APPARENT DEADLOCK
,會搜到很多,說明這些年,大家還是被這個問題困擾了挺久
我們這邊,每次出現這個連線洩露問題時,貌似都伴隨著這個紀錄檔,這個紀錄檔大概長下面這樣:
06-08 17:00:30,119[Timer-5][][c.ThreadPoolAsynchronousRunner:608][WARN]-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@3cf46c2 -- APPARENT DEADLOCK!!! Creating emergency threads for unassigned pending tasks!
06-08 17:00:30,121[Timer-5][][c.ThreadPoolAsynchronousRunner:624][WARN]-com.mchange.v2.async.ThreadPoolAsynchronousRunner$DeadlockDetector@3cf46c2 -- APPARENT DEADLOCK!!! Complete Status:
Managed Threads: 3
Active Threads: 3
Active Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@b451b27 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0)
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@65f9a338 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#1)
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@684ae5d5 (com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#2)
Pending Tasks:
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@d373871
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@245a897e
com.mchange.v2.resourcepool.BasicResourcePool$DestroyResourceTask@33f8c1d7
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask@107e24e9
Pool thread stack traces:
Thread[com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread-#0,5,main]
java.net.SocketInputStream.socketRead0(Native Method)
java.net.SocketInputStream.read(SocketInputStream.java:152)
java.net.SocketInputStream.read(SocketInputStream.java:122)
oracle.net.ns.Packet.receive(Packet.java:300)
oracle.net.ns.DataPacket.receive(DataPacket.java:106)
oracle.net.ns.NetInputStream.getNextPacket(NetInputStream.java:315)
oracle.net.ns.NetInputStream.read(NetInputStream.java:260)
oracle.net.ns.NetInputStream.read(NetInputStream.java:185)
oracle.net.ns.NetInputStream.read(NetInputStream.java:102)
oracle.jdbc.driver.T4CSocketInputStreamWrapper.readNextPacket(T4CSocketInputStreamWrapper.java:124)
oracle.jdbc.driver.T4CSocketInputStreamWrapper.read(T4CSocketInputStreamWrapper.java:80)
oracle.jdbc.driver.T4CMAREngine.unmarshalUB1(T4CMAREngine.java:1137)
oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:290)
oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:192)
oracle.jdbc.driver.T4CTTIoauthenticate.doOSESSKEY(T4CTTIoauthenticate.java:404)
oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:385)
oracle.jdbc.driver.PhysicalConnection.<init>(PhysicalConnection.java:546)
oracle.jdbc.driver.T4CConnection.<init>(T4CConnection.java:236)
oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:32)
oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:521)
com.mchange.v2.c3p0.DriverManagerDataSource.getConnection(DriverManagerDataSource.java:134)
com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:182)
com.mchange.v2.c3p0.WrapperConnectionPoolDataSource.getPooledConnection(WrapperConnectionPoolDataSource.java:171)
com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool$1PooledConnectionResourcePoolManager.acquireResource(C3P0PooledConnectionPool.java:137)
com.mchange.v2.resourcepool.BasicResourcePool.doAcquire(BasicResourcePool.java:1014)
com.mchange.v2.resourcepool.BasicResourcePool.access$800(BasicResourcePool.java:32)
com.mchange.v2.resourcepool.BasicResourcePool$AcquireTask.run(BasicResourcePool.java:1810)
com.mchange.v2.async.ThreadPoolAsynchronousRunner$PoolThread.run(ThreadPoolAsynchronousRunner.java:547)
我們有提到,有很多事情都是丟給執行緒池非同步執行的,比如main執行緒初始化連線時,main並不會自己去建立連線,而是new幾個task,丟給執行緒池並行執行,然後main執行緒在那邊等待。
主要有這麼幾種task:
com.mchange.v2.resourcepool.BasicResourcePool.AcquireTask
獲取資料庫連線,和底層db driver打交道,如mysql、oracle的driver
com/mchange/v2/resourcepool/BasicResourcePool.java:959
這個方法內,定義了一個內部class,這個DestroyResourceTask
就是用來銷燬底層連線
private void destroyResource(final Object resc, boolean synchronous)
{
class DestroyResourceTask implements Runnable
{
com.mchange.v2.resourcepool.BasicResourcePool#doCheckinManaged中的內部類:
class RefurbishCheckinResourceTask implements Runnable
這個類很重要,前面已經講到了,歸還連線的時候,就會生成這個task非同步執行
com.mchange.v2.resourcepool.BasicResourcePool.AsyncTestIdleResourceTask#AsyncTestIdleResourceTask
這個類,主要是測試那些空閒時間太長的資源,看看是不是還ok,不ok的話,會及時銷燬
com.mchange.v2.resourcepool.BasicResourcePool.RemoveTask
連線池縮容的時候需要,比如現在有20個連線,我們設定的min為10,那麼多出的10個連線會被銷燬
這裡面,有好幾個都是要和db通訊的,如AcquireTask、DestroyResourceTask、AsyncTestIdleResourceTask,通訊就有可能超時,長時間超時就可能阻塞當前的執行緒,接下來,我們就看看這些執行緒有沒有被阻塞的可能。
執行緒池的建立如下:
private ThreadPoolAsynchronousRunner( int num_threads,
boolean daemon,
int max_individual_task_time,
int deadlock_detector_interval,
int interrupt_delay_after_apparent_deadlock,
Timer myTimer,
boolean should_cancel_timer )
{
this.num_threads = num_threads;
this.daemon = daemon;
this.max_individual_task_time = max_individual_task_time;
this.deadlock_detector_interval = deadlock_detector_interval;
this.interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock;
this.myTimer = myTimer;
this.should_cancel_timer = should_cancel_timer;
// 建立執行緒池
recreateThreadsAndTasks();
myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval );
}
private void recreateThreadsAndTasks()
{
// 如果執行緒池已經存在,則先銷燬
if ( this.managed != null)
{
Date aboutNow = new Date();
for (Iterator ii = managed.iterator(); ii.hasNext(); )
{
PoolThread pt = (PoolThread) ii.next();
pt.gentleStop();
stoppedThreadsToStopDates.put( pt, aboutNow );
ensureReplacedThreadsProcessing();
}
}
// 建立執行緒池
this.managed = new HashSet();
this.available = new HashSet();
this.pendingTasks = new LinkedList();
for (int i = 0; i < num_threads; ++i)
{
// 執行緒type為com.mchange.v2.async.ThreadPoolAsynchronousRunner.PoolThread
Thread t = new PoolThread(i, daemon);
managed.add( t );
available.add( t );
t.start();
}
}
執行緒的執行邏輯:
// 1
boolean should_stop;
LinkedList pendingTasks;
while (true)
{
Runnable myTask;
synchronized ( ThreadPoolAsynchronousRunner.this )
{
while ( !should_stop && pendingTasks.size() == 0 )
ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL );
// 2
if (should_stop)
break thread_loop;
// 3
myTask = (Runnable) pendingTasks.remove(0);
currentTask = myTask;
}
try
{ // 4
if (max_individual_task_time > 0)
setMaxIndividualTaskTimeEnforcer();
// 5
myTask.run();
}
...
}
1處,線上程中定義了一個標誌,如果這個標誌為true,執行緒檢測到,會停止執行;
2處,檢測標誌;
3處,從任務列表摘取任務;
4處,如果max_individual_task_time大於0,可以啟動一個max_individual_task_time秒後中斷當前執行緒的timer
private void setMaxIndividualTaskTimeEnforcer()
{
this.maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer( this );
myTimer.schedule( maxIndividualTaskTimeEnforcer, max_individual_task_time );
}
5處,執行任務。
但是,我們說,5處是執行任務,從我們紀錄檔(前面的APPARENT DEADLOCK紀錄檔的堆疊)就能發現,5處執行任務時,貌似卡死了,等待db返回資料,結果好像db一直不返回。
這裡一旦長時間卡住,就會導致執行緒池沒法繼續執行其他task,包括:歸還連線到連線池的task、獲取新連線的task等。無法執行歸還連線的task,就會導致連線池中連線耗盡,看起來就像是發生了連線洩露一樣。
那麼,作為一個那時候的流行框架,作者是怎麼解決這個問題呢?
其實這個就是要講的com.mchange.v2.async.ThreadPoolAsynchronousRunner.DeadlockDetector
,也就是那個會列印死鎖紀錄檔的執行緒。
TimerTask deadlockDetector = new DeadlockDetector();
class DeadlockDetector extends TimerTask
這個task會定時執行,因為它是一個java.util.TimerTask
。
// com.mchange.v2.async.ThreadPoolAsynchronousRunner#ThreadPoolAsynchronousRunner
// 在建構函式中,就會使用timer開啟對這個timerTask的週期排程
myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval );
預設情況下,沒做額外設定的話,這個deadlock_detector_interval一般是10s,也就是10s執行一次,後面再講怎麼修改這個值。
這個task,每次被排程的時候,都幹些啥呢?
我先簡單說一下,主要就是檢測執行緒池裡的執行緒是不是出了問題,比如,被沒有超時時間的阻塞呼叫給卡死了,hang住了。我們想想,執行緒卡死了之後,現象是啥?執行緒是要處理任務的,如果它卡死了,那麼待處理的任務列表就會一直不變。(按理說,也可能越積越多,但是,作者的檢測思路就是,上一次排程時候的待處理任務連結串列,和本次排程時,待處理任務連結串列,一模一樣,就認為發生了死鎖。)
如果按照作者的演演算法,發生了執行緒全部hang死(也就是他說的死鎖),此時,會進行以下動作:
將這些執行緒的boolean should_stop;
標誌設為true,如果這些執行緒沒完全hang死,還能動的話,看到這個標誌,就會自行結束
把這些執行緒存到一個map,key是執行緒,value是當前時間
Date aboutNow = new Date();
for (Iterator ii = managed.iterator(); ii.hasNext(); )
{
PoolThread pt = (PoolThread) ii.next();
// 設定boolean should_stop;為true
pt.gentleStop();
// 存放到待結束執行緒的map
stoppedThreadsToStopDates.put( pt, aboutNow );
//
ensureReplacedThreadsProcessing();
}
上面的ensureReplacedThreadsProcessing
啟動一個timerTask。
private void ensureReplacedThreadsProcessing()
{
this.replacedThreadInterruptor = new ReplacedThreadInterruptor();
int replacedThreadProcessDelay = interrupt_delay_after_apparent_deadlock / 4;
myTimer.schedule( replacedThreadInterruptor, replacedThreadProcessDelay, replacedThreadProcessDelay );
}
這個timerTask每interrupt_delay_after_apparent_deadlock/4
執行一次,這個interrupt_delay_after_apparent_deadlock
就是個時間值,預設是60s,也就是說,預設15s執行一次timerTask,這個timerTask的職責是:
com.mchange.v2.async.ThreadPoolAsynchronousRunner.ReplacedThreadInterruptor
class ReplacedThreadInterruptor extends TimerTask
{
public void run()
{
synchronized (ThreadPoolAsynchronousRunner.this)
{ processReplacedThreads(); }
}
}
檢測那個執行緒map裡的每個執行緒,如果當前最新的時間 - 執行緒停止時(也就是打上should_stop標記)的時間,大於60s(interrupt_delay_after_apparent_deadlock
預設值),就呼叫這些執行緒的interrupt方法,大家知道,java.lang.Thread#interrupt
可以讓執行緒從阻塞操作中醒過來,也就相當於讓執行緒強制結束執行。
重建幾個新的執行緒:
this.managed = new HashSet();
this.available = new HashSet();
this.pendingTasks = new LinkedList();
for (int i = 0; i < num_threads; ++i)
{
Thread t = new PoolThread(i, daemon);
managed.add( t );
available.add( t );
t.start();
}
我們這個場景,是由於執行緒hang死,那麼,可能積壓了非常多的任務要執行,所以,這裡要臨時建立一些執行緒來負責這些任務:
// 這裡的current就是指向積壓的任務
current = (LinkedList) pendingTasks.clone();
這裡會緊急建立10個執行緒出來,然後將這些積壓的任務全部丟給這個新建立的執行緒池來執行。
按照上述分析,每次執行完這個TimerTask的邏輯後,老的執行緒會馬上打上should_stop標記,60s(interrupt_delay_after_apparent_deadlock)後會被強制interrupe。
會新建立n個執行緒來執行後續任務。至於積壓的任務,會臨時建立緊急執行緒池來執行。
看起來,大的邏輯倒是沒啥大問題,至於有沒有一些細節上的多執行緒問題,這個不能確定。
按理說,在紀錄檔中出現了APPARENT DEADLOCK字樣後,如果執行沒問題的話,新的執行緒就建立起來了,後續的請求,再需要獲取連線,就會在新的執行緒中執行,如果這時候後臺db是ok的,那麼就可以獲取到新的連線來執行sql了。
但我們這邊顯示,後續還是不斷地報錯,是不是說明新的執行緒中執行任務(如獲取連線那些),馬上又hang住了呢?
那就是說,db有問題的話,這個機制也起不了太大作用,還是不斷hang死,當然,這也說得過去,畢竟後臺db有問題,連線獲取困難的話,程式怎麼好的了呢?
由於程式中紀錄檔很匱乏,只開啟了某幾個logger的INFO級別,其他logger都是ERROR,所以沒法完全確定問題所在。
至於為啥logger都是ERROR,那是因為我們這個專案是老專案,打紀錄檔用的是log4j 1.2的老版本,有bug,寫紀錄檔的時候會進入synchronized包裹的程式碼,也就是要搶鎖,之前因為把級別調成INFO,導致了大問題,現在不敢貿然弄成INFO了。等後面先把log4j升級到log4j 2.x的版本,開啟更多紀錄檔,也許能發現更多。
目前,我們線上採取了臨時措施,寫了個shell指令碼,通過紀錄檔檢測,發現這種問題時,自動重啟服務,作為應急措施
設定oracle底層socket的SO_TIMEOUT,也就是讀超時的時間設定一下,避免這麼長時間的阻塞
設定執行緒池中每個task的最長執行時間:
com.mchange.v2.async.ThreadPoolAsynchronousRunner#max_individual_task_time
// 在PoolThread執行task前,會檢測上述欄位,大於0則啟動一個timerTask,指定時間後中斷本執行緒
try
{
if (max_individual_task_time > 0)
setMaxIndividualTaskTimeEnforcer();
myTask.run();
}
private void setMaxIndividualTaskTimeEnforcer()
{
this.maxIndividualTaskTimeEnforcer = new MaxIndividualTaskTimeEnforcer( this );
myTimer.schedule( maxIndividualTaskTimeEnforcer, max_individual_task_time );
}
class MaxIndividualTaskTimeEnforcer extends TimerTask
{
PoolThread pt;
Thread interruptMe;
public void run()
{
interruptMe.interrupt();
}
}
這個timerTask,在max_individual_task_time時間後,interrupt當前執行緒,這樣,也能避免執行緒長期被阻塞。
這個max_individual_task_time,可以通過設定項maxAdministrativeTaskTime來設定。
由於我們這邊紀錄檔的缺乏、dba也沒有配合查這個問題(之前沒懷疑到db也是一個原因),目前還不能完全確定問題的根因。
後續,可能會升級紀錄檔框架,把更多紀錄檔打出來;也會按照上面的優化思路,調整一下引數,主要是控制任務執行時間和socket的so_timeout,避免執行緒hang死。
再不行,換個連線池框架吧,這玩意設計就有缺陷,就是這個非同步獲取連線、歸還連線的問題,c3p0走向衰落也是正常。
另外,這個框架的執行緒搞得真是多,看著頭疼。