資料庫連線池之c3p0-0.9.1.2,16年的古董,發生連線洩露怎麼查(二)

2023-07-15 18:00:39

背景

本篇是c3p0連線洩露問題的第二篇,在前面一篇裡面,大體介紹了問題,問題就是,我們發現線上服務不響應的原因是拿不到連線。而為啥拿不到連線呢,因為空閒連結串列為空,那麼為什麼空閒連結串列為空呢?

這個我一開始的猜測就是,估計是某處程式碼從連線池裡獲取了連線,用完了沒有歸還,那麼,怎麼才能找到這些罪惡的程式碼呢?

結合簡單的原始碼分析、檔案、搜尋引擎,發現了兩個設定項。

<property name="unreturnedConnectionTimeout">50</property>
<property name="debugUnreturnedConnectionStackTraces">true</property>

設定項的官方解釋

首先說下,官方檔案去哪裡看呢,由於這個版本07年釋出的,而官網一般只有近期版本的檔案,所以我也是費了一些周折才找到。

https://sourceforge.net/projects/c3p0/files/c3p0-bin/c3p0-0.9.1.2/

另外,這裡也額外補充下原始碼的地址:原始碼有兩種方式獲得,一是通過mvn倉庫,二是https://sourceforge.net/projects/c3p0/files/c3p0-src/c3p0-0.9.1.2/,兩種方式獲得的原始碼我對比過,是一致的。

在下載的zip包中,doc/index.html即是該版本的離線檔案。

其中的Configuring to Debug and Workaround Broken Client Applications章節,講述了這兩個設定項的意思。喜歡看原文的看下圖即可:

我簡單翻譯下意思:

有時候,應用程式比較大意,它們從連線池中獲取的連線可能未呼叫close進行關閉。最終,池子膨脹到maxSize,然後因為這些大意的應用程式耗盡連線池。

解決這個問題的正確方式是修復程式。c3p0能幫你debug,讓你知道是哪裡發生了借出連線不歸還的情況。也有比較少見的情況下,程式的開發週期已經結束,即使你明知其有bug也無法修復。在這種情況下,c3p0能幫你繞過這個問題,阻止其耗盡連線池。

unreturnedConnectionTimeout 定義了一個連線被借出後可以多久不歸還的時間(單位秒),如果設定為非0的值,未歸還的被借出的連線,超過這個時間後會被銷燬,然後在連線池中重新生成新的連線。顯然,你必須設定這個引數在一個合理的值,以確保程式在拿到連線後有時間能去完成自己的所有潛在操作(增刪改查)。你能使用這個引數繞過那些有問題的借了連線不還的程式程式碼。

比繞過問題更好的辦法是修復程式碼。除了設定上述引數外,還需要設定debugUnreturnedConnectionStackTraces為true,那麼,在連線被借出時,會生成一個借用執行緒的執行緒堆疊快照。等到這類未歸還的連線超過unreturnedConnectionTimeout 的時候,就可以列印出對應的執行緒堆疊,從而揭示出問題所在。這個引數主要用於找出異常程式碼,修復後可以關掉,因為借出連線時生成執行緒堆疊也是一個較為耗時的行為。

設定項在原始碼中如何初始化

1、datasource的初始化

首先是設定,我們這邊是框架從一個xml檔案讀設定:

<datasource id="test" desc=""> 
    <property name="driver-name">oracle.jdbc.driver.OracleDriver</property>  
    <property name="url">jdbc:oracle:thin:@1.1.1.1:1521:orcl</property>
    <property name="user">111</property>
    <property name="password">111</property>  
    <property name="initialPoolSize">1</property>  
    <property name="minPoolSize">1</property>  
    <property name="maxPoolSize">50</property>  
    <property name="maxIdleTime">0</property>
    <property name="checkoutTimeout">10000</property>  
    <property name="maxStatements">0</property>  
    <property name="idleConnectionTestPeriod">0</property>
    <property name="acquireRetryAttempts">30</property>  
    <property name="acquireIncrement">2</property>  
    <property name="unreturnedConnectionTimeout">50</property>
    <property name="debugUnreturnedConnectionStackTraces">true</property>
  </datasource>

初始化程式碼如下:

核心的程式碼是上圖紅框,一方面接收一個未池化的只封裝了url、使用者名稱密碼的datasource,一方面接收設定項map。

進入函數內部後,會生成一個WrapperConnectionPoolDataSource物件,內部會把使用者的設定項map設定進去,這也包含了前面我們要設定進去的unreturnedConnectionTimeout 和 debugUnreturnedConnectionStackTraces。

WrapperConnectionPoolDataSource wcpds = new WrapperConnectionPoolDataSource(configName);
// 設定原始的未池化datasource
wcpds.setNestedDataSource( unpooledDataSource );
// 設定使用者設定項
BeansUtils.overwriteAccessiblePropertiesFromMap( overrideProps);

接下來呢,返回給使用者的datasource的實際型別為PoolBackedDataSource

PoolBackedDataSource nascent_pbds = new PoolBackedDataSource(configName);
nascent_pbds.setConnectionPoolDataSource( wcpds );

其實呢,這個PoolBackedDataSource,有個關鍵欄位poolManager還沒初始化,也就是說,連線池此時還沒生成(懶載入),看圖:

什麼時候生成連線池呢?要等到第一次呼叫的時候,如獲取連線。所以,我們框架是啟動時,先去呼叫一次,確保連線池初始化。

private static void connectToDB(DataSource dataSource)
{
    // datasource:  com.mchange.v2.c3p0.PoolBackedDataSource
    Connection conn =  dataSource.getConnection();
}

此時,就會進入如下方法:

com.mchange.v2.c3p0.impl.AbstractPoolBackedDataSource#getConnection()    
public Connection getConnection() throws SQLException
{
    PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
    return pc.getConnection();
}

首先進入getPoolManager:

private synchronized C3P0PooledConnectionPoolManager getPoolManager() throws SQLException
{
    if (poolManager == null)
    {
        ConnectionPoolDataSource cpds = assertCpds();
        poolManager = new C3P0PooledConnectionPoolManager(cpds, null, null, this.getNumHelperThreads(), this.getIdentityToken());
    }
    return poolManager;	    
}

可以看到,是個懶載入方法,首次進入時才開始new:

2、poolManager的初始化

com.mchange.v2.c3p0.impl.C3P0PooledConnectionPoolManager#poolsInit    

首先是生成一個定時任務執行執行緒:

this.timer = new Timer( true );

再下來,是生成一個執行緒池,當然,我們知道,執行緒池的執行緒,如果執行一些阻塞還不帶超時的方法(如網路請求,而對方完全不返回或者很久才返回的時候),就會導致執行緒完全hang死,所以,這個執行緒池支援設定一個時間閾值,超過這個閾值,就會把這個執行緒給中斷。

至於要不要生成這麼一個帶中斷功能的執行緒池,是根據設定項:maxAdministrativeTaskTime。如果值大於0,就開啟該功能。

int matt = this.getMaxAdministrativeTaskTime( null );
if ( matt > 0 )
{
    int matt_ms = matt * 1000;
    this.taskRunner = new ThreadPoolAsynchronousRunner( num_task_threads, 
                                                       true,
                                                       matt_ms,    
                                                       matt_ms * 3, 
                                                       matt_ms * 6, 
                                                       timer );
}

生成的執行緒池賦值給taskRunner欄位。

如果maxAdministrativeTaskTime為0,則呼叫如下方法生成:

this.taskRunner = new ThreadPoolAsynchronousRunner( num_task_threads, true, timer );

執行緒池生成後,還要生成一個工廠,一個用來建立資源池(即連線池)的工廠:

ResourcePoolFactory          rpfact;

this.rpfact = BasicResourcePoolFactory.createNoEventSupportInstance( taskRunner, timer );

3、執行緒池的生成

// 設定執行緒池大小,它這個執行緒池是不能擴縮容的,是固定大小
this.num_threads = num_threads;
// 後臺執行
this.daemon = daemon;
// maxAdministrativeTaskTime的值,一個任務最多執行多久
this.max_individual_task_time = max_individual_task_time;
// 死鎖檢測相關設定
this.deadlock_detector_interval = deadlock_detector_interval;
this.interrupt_delay_after_apparent_deadlock = interrupt_delay_after_apparent_deadlock;
this.myTimer = myTimer;
this.should_cancel_timer = should_cancel_timer;
// 根據設定建立執行緒
recreateThreadsAndTasks();
// 週期執行死鎖檢測任務
myTimer.schedule( deadlockDetector, deadlock_detector_interval, deadlock_detector_interval );

從上我們看到,這裡面有個timer,而且給這個timer傳遞了一個週期排程的job,這個job呢,是進行死鎖檢測的,為啥叫這個名字,我估計,作者也他麼知道自己用了太多多執行緒的東西了,太多syn關鍵字,容易導致出問題,所以搞個死鎖檢測的job,看看是不是有問題,有問題的話,就會打紀錄檔、重建執行緒池等。

而建立執行緒的部分如下:

private void recreateThreadsAndTasks()
{
    this.managed = new HashSet();
    this.available = new HashSet();
    this.pendingTasks = new LinkedList();
    for (int i = 0; i < num_threads; ++i)
    {
        Thread t = new PoolThread(i, daemon);
        managed.add( t );
        available.add( t );
        t.start();
    }
}

這裡就是並沒有執行緒池,有的只是n個執行緒,執行緒繼承了jdk自帶的執行緒:

class PoolThread extends Thread

執行邏輯如下:

HashSet    managed; // 所有的執行緒
HashSet    available; // 空閒執行緒
LinkedList pendingTasks; // 待執行的任務連結串列

while (true)
{
	Runnable myTask;
	synchronized ( ThreadPoolAsynchronousRunner.this )
	{	
        // 1 沒有任務,睡眠
		while ( pendingTasks.size() == 0 )
			ThreadPoolAsynchronousRunner.this.wait( POLL_FOR_STOP_INTERVAL );
		// 2 有任務,準備執行,則先把自己從空閒執行緒中摘掉	
		if (! available.remove( this ) )
			throw new InternalError("An unavailable PoolThread tried to check itself out!!!");
        // 3 獲取要執行的任務,隊頭獲取
		myTask = (Runnable) pendingTasks.remove(0);
		currentTask = myTask;
	}
	// 4 檢查是否設定了task的最長執行時間,設定了的話,要給timer排程一個n秒後執行的task,task屆時會打斷我們
	if (max_individual_task_time > 0)
		setMaxIndividualTaskTimeEnforcer();
    // 5 執行任務
	myTask.run(); 

	finally
	{   // 6 走到這,說明執行完成了,取消給timer排程的task
		if ( maxIndividualTaskTimeEnforcer != null )
			cancelMaxIndividualTaskTimeEnforcer();

		synchronized ( ThreadPoolAsynchronousRunner.this )
		{	// 7 把自己放回空閒執行緒列表
            available.add( this )
			currentTask = null;
		}
	}
}

4、連線池factory的建立

首先是連線池工廠的建立,工廠的建立並不複雜,工廠在構造器中,只是接收了從外部傳遞進來的timer和執行緒池:

BasicResourcePoolFactory( AsynchronousRunner taskRunner, 
                         RunnableQueue asyncEventQueue,  
                         Timer timer,
                         int default_num_task_threads)
{  
    this.taskRunner = taskRunner;
    this.timer = timer;
    this.default_num_task_threads = default_num_task_threads;
}

5、poolManager建立完成,建立pool

public Connection getConnection() throws SQLException
{
    PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
    return pc.getConnection();
}

接下來即進入getPool方法:

	com.mchange.v2.c3p0.impl.C3P0PooledConnectionPoolManager

    private C3P0PooledConnectionPool createPooledConnectionPool(DbAuth auth) throws SQLException
    {    
        C3P0PooledConnectionPool out =  new C3P0PooledConnectionPool( cpds,
                        auth,
                        this.getMinPoolSize( userName ),
                        this.getMaxPoolSize( userName ),
                        this.getInitialPoolSize( userName ),
                        this.getAcquireIncrement( userName ),
                        this.getAcquireRetryAttempts( userName ),
                        this.getAcquireRetryDelay( userName ),
                        this.getBreakAfterAcquireFailure( userName ),
                        this.getCheckoutTimeout( userName ),
                        this.getIdleConnectionTestPeriod( userName ),
                        this.getMaxIdleTime( userName ),
                        this.getMaxIdleTimeExcessConnections( userName ),
                        this.getMaxConnectionAge( userName ),
                        this.getPropertyCycle( userName ),
                        this.getUnreturnedConnectionTimeout( userName ),
                        this.getDebugUnreturnedConnectionStackTraces( userName ),
                        this.getTestConnectionOnCheckout( userName ),
                        this.getTestConnectionOnCheckin( userName ),
                        this.getMaxStatements( userName ),
                        this.getMaxStatementsPerConnection( userName ),
                        this.getConnectionTester( userName ),
                        this.getConnectionCustomizer( userName ),
                        realTestQuery,
                        rpfact,
                        taskRunner,
                        parentDataSourceIdentityToken );
	}

從上面可以看到,這裡獲取很多設定項,以我們關注的getUnreturnedConnectionTimeout為例:

private int getUnreturnedConnectionTimeout(String userName)
{
    return getInt("unreturnedConnectionTimeout", userName ); 
}

實際最終取值呢,就是從如下欄位中取值,這個大家看前文就知道,裡面儲存了我們的使用者設定項:

final ConnectionPoolDataSource cpds;

propName = "unreturnedConnectionTimeout";
Method m = (Method) propNamesToReadMethods.get( propName );
if (m != null)
{	
    // cpds中取值
    Object readProp = m.invoke( cpds, null );
    if (readProp != null)
        out = readProp.toString();
}

new C3P0PooledConnectionPool的內部,實現如下:

  • 如果設定了statement的快取相關引數,則建立快取:
this.scache = new DoubleMaxStatementCache( taskRunner, maxStatements, maxStatementsPerConnection );
  • 根據傳入的引數,設定到ResourcePoolFactory fact,如我們關注的

    fact.setDestroyOverdueResourceTime( unreturnedConnectionTimeout * 1000 );
    fact.setDebugStoreCheckoutStackTrace( debugUnreturnedConnectionStackTraces );
    

    注意,我們傳入的unreturnedConnectionTimeout,被換算成毫秒,賦值給:

    long destroy_overdue_resc_time;
    

    而debugUnreturnedConnectionStackTraces,變成了:

    boolean debug_store_checkout_stacktrace = false;
    

    完整的如下:

    fact.setMin( min );
    fact.setMax( max );
    fact.setStart( start );
    fact.setIncrement( inc );
    fact.setIdleResourceTestPeriod( idleConnectionTestPeriod * 1000);
    fact.setResourceMaxIdleTime( maxIdleTime * 1000 );
    fact.setExcessResourceMaxIdleTime( maxIdleTimeExcessConnections * 1000 );
    fact.setResourceMaxAge( maxConnectionAge * 1000 );
    fact.setExpirationEnforcementDelay( propertyCycle * 1000 );
    fact.setDestroyOverdueResourceTime( unreturnedConnectionTimeout * 1000 );
    fact.setDebugStoreCheckoutStackTrace( debugUnreturnedConnectionStackTraces );
    fact.setAcquisitionRetryAttempts( acq_retry_attempts );
    fact.setAcquisitionRetryDelay( acq_retry_delay );
    fact.setBreakOnAcquisitionFailure( break_after_acq_failure );
    
  • 建立資源池

    rp = fact.createPool( manager );
    
  • resourcePool建立

    首先是,把傳入的設定儲存下來:

    this.start                            = start;
    this.min                              = min;
    this.max                              = max;
    this.inc                              = inc;
    this.num_acq_attempts                 = num_acq_attempts;
    this.acq_attempt_delay                = acq_attempt_delay;
    this.check_idle_resources_delay       = check_idle_resources_delay;
    this.max_resource_age                 = max_resource_age;
    this.max_idle_time                    = max_idle_time;
    this.excess_max_idle_time             = excess_max_idle_time;
    // 我們關注的屬性
    this.destroy_unreturned_resc_time     = destroy_unreturned_resc_time;
    this.debug_store_checkout_exceptions  = (debug_store_checkout_exceptions && destroy_unreturned_resc_time > 0
                                             
    this.break_on_acquisition_failure     = break_on_acquisition_failure;                                         
    

    其次,外部傳入的timer、執行緒池的參照也儲存下來:

    this.taskRunner                       = taskRunner;
    // 就是最前面那個死鎖檢測的timer,傳進來換了名字
    this.cullAndIdleRefurbishTimer        = cullAndIdleRefurbishTimer;
    

    開始池子的初始化:

    // 計算初始的池子大小
    this.target_pool_size = Math.max(start, min);
    //start acquiring our initial resources
    ensureStartResources();
    
    private void ensureStartResources()
    { recheckResizePool(); }
    
    private void expandPool(int count)
    {
        for (int i = 0; i < count; ++i)
            taskRunner.postRunnable( new AcquireTask() );
    }
    

    注意,這裡是非同步去建立資源的,這裡只負責生成建立任務。為啥c3p0程式碼難懂,就是動不動整個多執行緒,最終難以維護也是這個原因。執行緒池taskRunner,就是前文那個執行緒池

    資源建立完成後,開始給timer生成一個週期task,該task主要檢測有沒有連線過期了,或者空閒太長時間,如果找到這種資源,就進行人道毀滅。

    if (mustEnforceExpiration())
    {
        this.cullTask = new CullTask();
        // 給timer整個週期job
        cullAndIdleRefurbishTimer.schedule( cullTask, minExpirationTime(), this.expiration_enforcement_delay );
    }
    

    而注意上面的方法mustEnforceExpiration,我們關注的引數就會參與到這裡:

        private boolean mustEnforceExpiration()
        {
            return
                    max_resource_age > 0 ||
                            max_idle_time > 0 ||
                            excess_max_idle_time > 0 ||
                			// 這裡
                            destroy_unreturned_resc_time > 0;
        }
    

    如果你其他幾個屬性都沒設定,只設定了 unreturnedConnectionTimeout,它就會進入該分支,否則大家都為0,是開啟不了分支的, CullTask就不會被排程。

    至此,我們完成了pool的初始化。

設定項在連線借出時如何生效

再回到下面這個程式碼:

public Connection getConnection() throws SQLException
{
    PooledConnection pc = getPoolManager().getPool().checkoutPooledConnection();
    return pc.getConnection();
}

上面,我們完成了 getPoolManager().getPool()的原始碼分析,終於可以去獲取連線了。

com.mchange.v2.c3p0.impl.C3P0PooledConnectionPool#checkoutPooledConnection

public PooledConnection checkoutPooledConnection() throws SQLException
{
    // rp: final ResourcePool rp;
	return (PooledConnection) rp.checkoutResource( checkoutTimeout ); 
}

這裡找pool獲取連線,實際內部會去找rp借連線:

private synchronized Object prelimCheckoutResource( long timeout )
{
	// 檢查空閒連結串列的size
    int available = unused.size();
    if (available == 0)
    {
        // 空閒連結串列為空,檢測是否可以擴容
        int msz = managed.size();
        if (msz < max)
        {
            int desired_target = msz + acquireWaiters.size() + 1;
            if (desired_target >= target_pool_size)
            {
                desired_target = Math.max(desired_target, target_pool_size + inc);
                target_pool_size = Math.max( Math.min( max, desired_target ), min );

                _recheckResizePool();
            }
        }
		// 擴容是非同步的,在這裡等待
        awaitAvailable(timeout); //throws timeout exception
    }
	// 有連線,取佇列頭部的
    Object  resc = unused.get(0);
	// 取到的連線有問題,毀滅資源
    if ( shouldExpire( resc ) )
    {
        removeResource( resc );
        ensureMinResources();
        return prelimCheckoutResource( timeout );
    }
    else
    {	
        // 資源ok,正常返回
        unused.remove(0);
        return resc;
    }
}

如果借到的連線ok,則獲取該連線對應的卡片,在卡片上記錄借出時間、當前執行緒的堆疊:

PunchCard card = (PunchCard) managed.get( resc );
// 借出時間
card.checkout_time = System.currentTimeMillis();
// 如果開啟了這個debugUnreturnedConnectionStackTraces選項,記錄當前執行緒的堆疊
if (debug_store_checkout_exceptions)
    card.checkoutStackTraceException = new Exception("DEBUG ONLY: Overdue resource check-out stack trace.");

這裡是new了一個異常,在異常上呼叫getStackTrace就能獲取堆疊。

設定項在連線毀滅時如何生效

查詢PunchCard的checkoutStackTraceException的usage,發現在毀滅連線的方法中會使用該欄位:

com.mchange.v2.resourcepool.BasicResourcePool#removeResource(java.lang.Object, boolean)
    
private void removeResource(Object resc, boolean synchronous)
{
    PunchCard pc = (PunchCard) managed.remove(resc);

    if (pc != null)
    {
        // 資源的卡片中,借出時間大於0,表示資源當前是被借出的,正常我們是不會去destroy正常的連線,既然被destroy,說明這個連線是有問題的,就是那種沒歸還的連線,因為歸還的話,checkout_time會置為-1
        if ( pc.checkout_time > 0 && !broken) 
        {
            logger.info("A checked-out resource is overdue, and will be destroyed: " + resc);		
            // 這裡列印借出者的堆疊
            if (pc.checkoutStackTraceException != null)
            {
                logger.log( MLevel.INFO,
                           "Logging the stack trace by which the overdue resource was checked-out.",
                           pc.checkoutStackTraceException );
            }
        }
    }
	// 從空閒連結串列刪除
    unused.remove(resc);
    // 呼叫底層方法關閉連線
    destroyResource(resc, synchronous);
    addToFormerResources( resc );
}

那麼,destroyResource什麼時候觸發呢,說實話,實際很多:

我們很多連線池都有這樣的機制,借出連線後,可以檢測,如select 1,select from dual等,如果檢測失敗,就可以destroy;歸還時,也可以檢測,失敗就destroy。

也有一些定時任務,檢測是否空閒太久、檢測能不能正常使用,不能的話,就destroy。

總結

看起來,作者的這套機制也是沒啥問題的,借出時打標機。然後靠timer定時排程的job,去檢測這些連線,看看是不是超時、過期、不正常、空閒太久等,只要滿足這些異常條件,就會destroy這個異常連線,destroy的時候,就列印借出者的堆疊,方便開發者修復bug。

可以這麼說,如果只是單純的程式碼問題,寫的程式碼太粗心而導致連線未關閉,而不是什麼別的問題,看起來這個機制是沒啥問題的。

但是,我按照目前這個設定,弄到線上後,我以為可以解決我這邊的問題了,但是,列印出來的堆疊卻非常誤導人,我在按照對應的堆疊發起測試,發現本地是會正常關閉連線的。

那麼,我覺得,有可能我這邊的問題,不是簡單的連線洩露,但是我這邊的這個場景,造成的結果卻幾乎和連線洩露一模一樣。

所以,我擔心我這邊的情況是,會不會是我歸還了,但是在執行c3p0的歸還程式碼時,歸還失敗了呢?

具體情況,等到下一篇我再分析。