MySQL半同步複製原始碼解析

2022-09-13 12:00:21

今天 DBA 同事問了一個問題,MySQL在半同步複製的場景下,當關閉從節點時使得從節點的數量 < rpl_semi_sync_master_wait_for_slave_count時,show full processlist 的結果不同,具體表現如下:

AFTER_SYNC表現如下:

 

 可以發現,只有一個查詢執行緒處於 Waiting for semi-sync ACK from slave 狀態,其他查詢執行緒處於 query end 狀態。

 

AFTER_COMMIT 表現如下:

 

 和 AFTER_SYNC 不同, 所有的查詢執行緒處於 Waiting for semi-sync ACK from slave 狀態;

 

之前已經瞭解過 MySQL半同步複製,這次從原始碼的角度來解析MySQL半同步複製到底是如何進行的,同時分析原因。

首先看事務的提交過程,整體的提交流程過長,切之前已經研究過原始碼,這裡僅對關於半同步複製相關的部分做深入分析:

int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit)
{  ....
  // 執行 flush 階段操作。
  /*
  * 1. 對 flush 佇列進行 fetch, 本次處理的flush佇列就固定了
    2. 在 innodb 儲存引擎中 flush redo log, 做 innodb 層 redo 持久化。
    3. 為 flush 佇列中每個事務生成 gtid。
    4. 將 flush佇列中每個執行緒的 binlog cache flush 到 binlog 紀錄檔檔案中。這裡包含兩步:
            1. 將事務的 GTID event直接寫入 binlog 磁碟檔案中
            2. 將事務生成的別的 event 寫入 binlog file cache 中
  */
  flush_error = process_flush_stage_queue(&total_bytes, &do_rotate,
                                          &wait_queue);
  // 將 binary log cache(IO cache) flush到檔案中
  if (flush_error == 0 && total_bytes > 0)
// 這裡獲取到 flush 佇列中最後一個事務在 binlog 中的 end_pos flush_error
= flush_cache_to_file(&flush_end_pos); DBUG_EXECUTE_IF("crash_after_flush_binlog", DBUG_SUICIDE();); // sync_binlog 是否等於 1 update_binlog_end_pos_after_sync = (get_sync_period() == 1); /* If the flush finished successfully, we can call the after_flush hook. Being invoked here, we have the guarantee that the hook is executed before the before/after_send_hooks on the dump thread preventing race conditions among these plug-ins. 如果 flush 操作成功, 則呼叫 after_flush hook。 */ if (flush_error == 0) { const char *file_name_ptr = log_file_name + dirname_length(log_file_name); assert(flush_end_pos != 0); // 觀察者模式,呼叫 Binlog_storage_observer 裡面的repl_semi_report_binlog_update函數,將當前的 binlog 檔案和最新的 pos 點記錄到 active_tranxs_ 列表中 // file_name_ptr 當前寫入的binlog檔案 // flush_end_pos 組提交flush連結串列裡面所有binlog最後的pos點 if (RUN_HOOK(binlog_storage, after_flush, (thd, file_name_ptr, flush_end_pos))) { sql_print_error("Failed to run 'after_flush' hooks"); flush_error = ER_ERROR_ON_WRITE; }    // 不等於 1, 通知 dump 執行緒 if (!update_binlog_end_pos_after_sync)
    // 更新 binlog end pos, 通知 binlog sender 執行緒向從庫傳送 event update_binlog_end_pos(); DBUG_EXECUTE_IF(
"crash_commit_after_log", DBUG_SUICIDE();); } ...... DEBUG_SYNC(thd, "bgc_after_flush_stage_before_sync_stage"); /* Stage #2: Syncing binary log file to disk */
  /** 釋放 Lock_log mutex, 獲取 Lock_sync mutex
   *  第一個進入的 flush 佇列的 leader 為本階段的 leader, 其他 flush 佇列加入 sync 佇列, 其他 flush 佇列的
   * leader會被阻塞, 直到 commit 階段被 leader 執行緒喚醒。
   * */
  if (change_stage(thd, Stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync))
  {
    DBUG_RETURN(finish_commit(thd));
  }

  /*
    根據 delay 的設定來決定是否延遲一段時間, 如果 delay 的時間越久, 那麼加入 sync 佇列的
    事務就越多【last commit 是在 binlog prepare 時生成的, 尚未更改, 因此加入 sync 佇列的
    事務是同一組事務】, 提高了從庫 mts 的效率。
*/
  if (!flush_error && (sync_counter + 1 >= get_sync_period()))
    stage_manager.wait_count_or_timeout(opt_binlog_group_commit_sync_no_delay_count,
                                        opt_binlog_group_commit_sync_delay,
                                        Stage_manager::SYNC_STAGE);
    // fetch sync 佇列, 對 sync 佇列進行固化.
  final_queue = stage_manager.fetch_queue_for(Stage_manager::SYNC_STAGE);
    // 這裡 sync_binlog file到磁碟中
if (flush_error == 0 && total_bytes > 0)
  {
      // 根據 sync_binlog 的設定決定是否刷盤
    std::pair<bool, bool> result = sync_binlog_file(false);
  }
    // 在這裡 sync_binlog = 1, 更新 binlog end_pos, 通知 dump 執行緒傳送 event
if (update_binlog_end_pos_after_sync)
  {
    THD *tmp_thd = final_queue;
    const char *binlog_file = NULL;
    my_off_t pos = 0;
    while (tmp_thd->next_to_commit != NULL)
      tmp_thd = tmp_thd->next_to_commit;
    if (flush_error == 0 && sync_error == 0)
    {
      tmp_thd->get_trans_fixed_pos(&binlog_file, &pos);
        // 更新 binlog end pos, 通知 dump 執行緒
      update_binlog_end_pos(binlog_file, pos);
    }
  }
  DEBUG_SYNC(thd, "bgc_after_sync_stage_before_commit_stage");
  leave_mutex_before_commit_stage = &LOCK_sync;
  /*
    Stage #3: Commit all transactions in order.
    按順序在 Innodb 層提交所有事務。
    如果我們不需要對提交順序進行排序, 並且每個執行緒必須執行 handlerton 提交, 那麼這個階段可以跳過。
    然而, 由於我們保留了前一階段的鎖, 如果我們跳過這個階段, 則必須進行解鎖。
*/
commit_stage:
    // 如果需要順序提交
if (opt_binlog_order_commits &&
      (sync_error == 0 || binlog_error_action != ABORT_SERVER))
  {
     // SYNC佇列加入 COMMIT 佇列, 第一個進入的 SYNC 佇列的 leader 為本階段的 leader。其他 sync 佇列
     // 加入 commit 佇列的 leade 會被阻塞, 直到 COMMIT 階段後被 leader 執行緒喚醒。
     // 釋放 lock_sync mutex, 持有 lock_commit mutex.
if (change_stage(thd, Stage_manager::COMMIT_STAGE,
                     final_queue, leave_mutex_before_commit_stage,
                     &LOCK_commit))
    {
      DBUG_PRINT("return", ("Thread ID: %u, commit_error: %d",
                            thd->thread_id(), thd->commit_error));
      DBUG_RETURN(finish_commit(thd));
    }
    THD *commit_queue = stage_manager.fetch_queue_for(Stage_manager::COMMIT_STAGE);
    DBUG_EXECUTE_IF("semi_sync_3-way_deadlock",
                    DEBUG_SYNC(thd, "before_process_commit_stage_queue"););

    if (flush_error == 0 && sync_error == 0)
      // 呼叫 after_sync hook.注意:對於after_sync, 這裡將等待binlog dump 執行緒收到slave節點關於佇列中事務最新的 binlog_file和 binlog_pos的ACK。
      sync_error = call_after_sync_hook(commit_queue);
     /* process_commit_stage_queue 將為佇列中每個 thd 持有的 GTID
      呼叫 update_on_commit 或 update_on_rollback。
      這樣做的目的是確保 gtid 按照順序新增到 GTIDs中, 避免出現不必要的間隙
      如果我們只允許每個執行緒在完成提交時呼叫 update_on_commit, 則無法保證 GTID
      順序, 並且 gtid_executed 之間可能出現空隙。發生這種情況, server必須從
      Gtid_set 中新增和刪除間隔, 新增或刪除間隔需要一個互斥鎖, 這會降低效能。
    */
    // 在這裡, 進入儲存引擎中提交
    process_commit_stage_queue(thd, commit_queue);
    // 退出 Lock_commit 鎖
    mysql_mutex_unlock(&LOCK_commit);
    /* 在 LOCK_commit 釋放之後處理 after_commit 來避免 user thread, rotate thread 和 dump thread的
       3路死鎖。
    */
    // 處理 after_commit HOOK
    process_after_commit_stage_queue(thd, commit_queue);
  }
  else
  {
      // 釋放鎖, 呼叫 after_sync hook.
if (leave_mutex_before_commit_stage)
      mysql_mutex_unlock(leave_mutex_before_commit_stage);
    if (flush_error == 0 && sync_error == 0)
      sync_error = call_after_sync_hook(final_queue);
  }
......
/* Finish the commit before executing a rotate, or run the risk of a deadlock. We don't need the return value here since it is in thd->commit_error, which is returned below. */ (void)finish_commit(thd); ...... }

在以上過程中,可以看到,在 flush 節點之後會執行 AFTER_FLUSH hook, 這個 hook 會將當前的 binlog 檔案和最新的 pos 點位記錄到 active_tranxs_ 連結串列中,這個連結串列在半同步複製等待 slave 節點 apply 中使用:

AFTER_FLUSH:
-----------------------------------------------------------
int Binlog_storage_delegate::after_flush(THD *thd,
                                         const char *log_file,
                                         my_off_t log_pos)
{
  DBUG_ENTER("Binlog_storage_delegate::after_flush");
  DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu",
                       log_file, (ulonglong) log_pos));
  Binlog_storage_param param;
  param.server_id= thd->server_id;

  int ret= 0;
// 這裡觀察者模式 FOREACH_OBSERVER(ret, after_flush, thd, (
&param, log_file, log_pos)); DBUG_RETURN(ret); }
int repl_semi_report_binlog_update(Binlog_storage_param *param,
                   const char *log_file,
                   my_off_t log_pos)
{
  int  error= 0;

  if (repl_semisync.getMasterEnabled())
  {
    /*
      Let us store the binlog file name and the position, so that
      we know how long to wait for the binlog to the replicated to
      the slave in synchronous replication.
// 這裡將 binlog filename & pos 寫入 active_tranxs_ 連結串列
*/ error= repl_semisync.writeTranxInBinlog(log_file, log_pos); } return error; }

半同步複製的關鍵是對 after_sync 和 after_commit 的不同選擇,因此這裡我們主要分析 call_after_sync_hook(commit_queue) 和 process_after_commit_stage_queue(thd, commit_queue) 函數,這兩個函數中分別呼叫了  RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos)) 和 RUN_HOOK(transaction, after_commit, (head, all)) 函數,其分別對應 Binlog_storage_delegate::after_sync(THD *thd, const char *log_file,my_off_t log_pos) 和 Trans_delegate::after_commit(THD *thd, bool all) 函數, 這裡採用觀察者模式,我們直接找到其對應的實現:

AFTER_SYNC:
-----------------------------------------------------------------
static inline int call_after_sync_hook(THD *queue_head)
{
  const char *log_file = NULL;
  my_off_t pos = 0;

  if (NO_HOOK(binlog_storage))
    return 0;

  assert(queue_head != NULL);
  for (THD *thd = queue_head; thd != NULL; thd = thd->next_to_commit)
    if (likely(thd->commit_error == THD::CE_NONE))
// 可以看到,這裡獲取了固化後的 commit 佇列中的最新的事務的 binlog filename & pos thd
->get_trans_fixed_pos(&log_file, &pos); // 使用最新的 binlog filename & pos 呼叫 after_sync hook if (DBUG_EVALUATE_IF("simulate_after_sync_hook_error", 1, 0) || RUN_HOOK(binlog_storage, after_sync, (queue_head, log_file, pos))) { sql_print_error("Failed to run 'after_sync' hooks"); return ER_ERROR_ON_WRITE; } return 0; } // after_sync 函數定義 int Binlog_storage_delegate::after_sync(THD *thd, const char *log_file, my_off_t log_pos) { DBUG_ENTER("Binlog_storage_delegate::after_sync"); DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", log_file, (ulonglong) log_pos)); Binlog_storage_param param; param.server_id= thd->server_id; assert(log_pos != 0); int ret= 0; FOREACH_OBSERVER(ret, after_sync, thd, (&param, log_file, log_pos)); // 找到觀察器呼叫, 這是是觀察者模式 DEBUG_SYNC(thd, "after_call_after_sync_observer"); DBUG_RETURN(ret); }
AFTER_SYNC:
----------------------------------------------------------------------------------------------------------
// after_sync() 介面的具體實現 int repl_semi_report_binlog_sync(Binlog_storage_param *param, const char *log_file, my_off_t log_pos) { // 是否是 after_sync 模式 if (rpl_semi_sync_master_wait_point == WAIT_AFTER_SYNC) // 執行事務的執行緒等待從庫的回覆, 即等待 ACK 的實現函數 return repl_semisync.commitTrx(log_file, log_pos); return 0; }
AFTER_COMMIT:
-----------------------------------------------------------------------
void MYSQL_BIN_LOG::process_after_commit_stage_queue(THD *thd, THD *first)
{
  for (THD *head = first; head; head = head->next_to_commit)
  {
    if (head->get_transaction()->m_flags.run_hooks &&
        head->commit_error != THD::CE_COMMIT_ERROR)
    {

      /*
        TODO: This hook here should probably move outside/below this
              if and be the only after_commit invocation left in the
              code.
      */
#ifndef EMBEDDED_LIBRARY
      Thd_backup_and_restore switch_thd(thd, head);
#endif /* !EMBEDDED_LIBRARY */
      bool all = head->get_transaction()->m_flags.real_commit;
// 可以看到,這裡針對固化的 commit 佇列中的每一個事務都進行了 after_commit HOOK. (
void)RUN_HOOK(transaction, after_commit, (head, all)); /* When after_commit finished for the transaction, clear the run_hooks flag. This allow other parts of the system to check if after_commit was called. */ head->get_transaction()->m_flags.run_hooks = false; } } } int Trans_delegate::after_commit(THD *thd, bool all) { DBUG_ENTER("Trans_delegate::after_commit"); Trans_param param; TRANS_PARAM_ZERO(param); param.server_uuid= server_uuid; param.thread_id= thd->thread_id(); param.rpl_channel_type = thd->rpl_thd_ctx.get_rpl_channel_type(); bool is_real_trans= (all || !thd->get_transaction()->is_active(Transaction_ctx::SESSION)); if (is_real_trans) param.flags|= TRANS_IS_REAL_TRANS; thd->get_trans_fixed_pos(&param.log_file, &param.log_pos); param.server_id= thd->server_id; DBUG_PRINT("enter", ("log_file: %s, log_pos: %llu", param.log_file, param.log_pos)); DEBUG_SYNC(thd, "before_call_after_commit_observer"); int ret= 0;
// 這裡觀察者模式 FOREACH_OBSERVER(ret, after_commit, thd, (
&param)); DBUG_RETURN(ret); }
AFTER_COMMIT:
----------------------------------------------------------------------
// after_commit 實際呼叫函數
int repl_semi_report_commit(Trans_param *param)
{

  bool is_real_trans= param->flags & TRANS_IS_REAL_TRANS;
  // semi_sync 是 AFTER_COMMIT && 是真正的事務 
  if (rpl_semi_sync_master_wait_point == WAIT_AFTER_COMMIT &&
      is_real_trans && param->log_pos)
  {
    const char *binlog_name= param->log_file;
    // 執行事務的執行緒等待從庫的回覆, 即等待 ACK 的實現函數
    return repl_semisync.commitTrx(binlog_name, param->log_pos);
  }
  return 0;
}
// 執行事務的執行緒等待從庫的回覆, 即等待 ACK 的實現函數
int ReplSemiSyncMaster::commitTrx(const char* trx_wait_binlog_name,
                  my_off_t trx_wait_binlog_pos)
{
  const char *kWho = "ReplSemiSyncMaster::commitTrx";

  function_enter(kWho);
  PSI_stage_info old_stage;

#if defined(ENABLED_DEBUG_SYNC)
  /* debug sync may not be initialized for a master */
  if (current_thd->debug_sync_control)
    DEBUG_SYNC(current_thd, "rpl_semisync_master_commit_trx_before_lock");
#endif
  /* Acquire the mutex. 
  獲取 LOCK_binlog_ 互斥鎖
  */
  lock();
  
  TranxNode* entry= NULL;
  mysql_cond_t* thd_cond= NULL;
  bool is_semi_sync_trans= true;
  // active_transx_ 為當前活躍的事務連結串列,在 after_flush HOOK 中會將 flush 佇列中最新的事務的 binlog filename & pos 新增到該連結串列中
  // trx_wait_binlog_name 為固化的 commit 佇列中最新的事務的 binlog filename 
  if (active_tranxs_ != NULL && trx_wait_binlog_name)
  {
    // 遍歷 active_tranxs_ 活躍的事務連結串列, 找到大於等於 trx_wait_binlog_name 和 trx_wait_binlog_pos 
    // 的第一個事務
    entry=
      active_tranxs_->find_active_tranx_node(trx_wait_binlog_name,
                                             trx_wait_binlog_pos);
    // 如果找到了第一個事務                                         
    if (entry)
      thd_cond= &entry->cond;
  }
  /* This must be called after acquired the lock */
  // 當前執行緒進入 thd_cond 
  THD_ENTER_COND(NULL, thd_cond, &LOCK_binlog_,
                 & stage_waiting_for_semi_sync_ack_from_slave,
                 & old_stage);
  // 如果主庫啟用了半同步 
  if (getMasterEnabled() && trx_wait_binlog_name)
  {
    struct timespec start_ts;
    struct timespec abstime;
    int wait_result;
    // 設定當前時間 start_ts
    set_timespec(&start_ts, 0);
    /* This is the real check inside the mutex. */
    // 主庫沒有啟動半同步 || 沒有啟動半同步複製, l_end
    if (!getMasterEnabled() || !is_on())
      goto l_end;

    if (trace_level_ & kTraceDetail)
    {
      sql_print_information("%s: wait pos (%s, %lu), repl(%d)\n", kWho,
                            trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
                            (int)is_on());
    }

    /* Calcuate the waiting period. */
#ifndef HAVE_STRUCT_TIMESPEC
      abstime.tv.i64 = start_ts.tv.i64 + (__int64)wait_timeout_ * TIME_THOUSAND * 10;
      abstime.max_timeout_msec= (long)wait_timeout_;
#else
      // wait_timeout 時間
      abstime.tv_sec = start_ts.tv_sec + wait_timeout_ / TIME_THOUSAND;
      abstime.tv_nsec = start_ts.tv_nsec +
        (wait_timeout_ % TIME_THOUSAND) * TIME_MILLION;
      if (abstime.tv_nsec >= TIME_BILLION)
      {
        abstime.tv_sec++;
        abstime.tv_nsec -= TIME_BILLION;
      }
#endif /* _WIN32 */
    // 開啟了半同步
    while (is_on())
    {
      // 如果有從庫回覆
      if (reply_file_name_inited_)
      {
        // 比較從庫回覆的紀錄檔座標(filename & fileops)和固化的 commit 佇列中最新的事務的 binlog filename & pos
        int cmp = ActiveTranx::compare(reply_file_name_, reply_file_pos_,
                                       trx_wait_binlog_name, trx_wait_binlog_pos);
        // 如果回覆的紀錄檔座標大於當前的紀錄檔座標                               
        if (cmp >= 0)
        {
          /* We have already sent the relevant binlog to the slave: no need to
           * wait here.
             我們已經確認將相應的 binlog 傳送給了從庫: 無需在此等待。
           */
          if (trace_level_ & kTraceDetail)
            sql_print_information("%s: Binlog reply is ahead (%s, %lu),",
                                  kWho, reply_file_name_, (unsigned long)reply_file_pos_);
          // 退出迴圈                        
          break;
        }
      }
      /*
        When code reaches here an Entry object may not be present in the
        following scenario.
        當程式碼到了這裡, 在一下場景中可能不存在 entry。
        Semi sync was not enabled when transaction entered into ordered_commit
        process. During flush stage, semi sync was not enabled and there was no
        'Entry' object created for the transaction being committed and at a
        later stage it was enabled. In this case trx_wait_binlog_name and
        trx_wait_binlog_pos are set but the 'Entry' object is not present. Hence
        dump thread will not wait for reply from slave and it will not update
        reply_file_name. In such case the committing transaction should not wait
        for an ack from slave and it should be considered as an async
        transaction.
        事務進入 ordered_commit 時未啟用半同步。
        在 flush 階段, 沒有啟用半同步, 沒有為提交的事務建立 entry 物件, 但是在之後的節點啟用了半同步。
        在這種情況下, 設定了 trx_wait_binlog_name 和 trx_wait_binlog_pos, 但是 entry 物件並不存在。
        此時, dump 執行緒將不會等待 slave 節點的 reply, 並且不會更新 reply_file_name。
        在這種情況下, 提交的事務不應等待來自 slave 節點的 ack, 而應被視為非同步事務。
      */
      if (!entry)
      {
        is_semi_sync_trans= false;
        goto l_end;
      }

      /* Let us update the info about the minimum binlog position of waiting
       * threads.
       * 這裡更新等待執行緒等待的 minimum binlog pos 。
       */
      if (wait_file_name_inited_)
      {
        // 對比當前 commit 佇列最後的binlog點位 和 wait_file_name_ & wait_file_pos_ 大小
        int cmp = ActiveTranx::compare(trx_wait_binlog_name, trx_wait_binlog_pos,
                                       wait_file_name_, wait_file_pos_);
        if (cmp <= 0)
          {
          /* This thd has a lower position, let's update the minimum info. 
          這裡更新 wait_file_name_ & wait_file_pos_。
          */
          strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
          wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
          wait_file_pos_ = trx_wait_binlog_pos;

          rpl_semi_sync_master_wait_pos_backtraverse++;
          if (trace_level_ & kTraceDetail)
            sql_print_information("%s: move back wait position (%s, %lu),",
                                  kWho, wait_file_name_, (unsigned long)wait_file_pos_);
        }
      }
      else
      {
        strncpy(wait_file_name_, trx_wait_binlog_name, sizeof(wait_file_name_) - 1);
        wait_file_name_[sizeof(wait_file_name_) - 1]= '\0';
        wait_file_pos_ = trx_wait_binlog_pos;
        wait_file_name_inited_ = true;

        if (trace_level_ & kTraceDetail)
          sql_print_information("%s: init wait position (%s, %lu),",
                                kWho, wait_file_name_, (unsigned long)wait_file_pos_);
      }

      /* In semi-synchronous replication, we wait until the binlog-dump
       * thread has received the reply on the relevant binlog segment from the
       * replication slave.
       * 在半同步複製中, 我們等待直到 binlog dump 執行緒收到相關 binlog 的 reply 資訊。
       * 
       * Let us suspend this thread to wait on the condition;
       * when replication has progressed far enough, we will release
       * these waiting threads.
       * 讓我們暫停這個執行緒以等待這個條件; 
       * 當複製進展足夠時, 我們將釋放等待的執行緒。
       */
      // 判斷 slave 個數和半同步是否正常
      // 當前 slave 節點的數量 == rpl_semi_sync_master_wait_for_slave_count -1 && 半同步複製正開啟
      if (abort_loop && (rpl_semi_sync_master_clients ==
                         rpl_semi_sync_master_wait_for_slave_count - 1) && is_on())
      {
        sql_print_warning("SEMISYNC: Forced shutdown. Some updates might "
                          "not be replicated.");
        // 關閉半同步, 中斷迴圈                
        switch_off();
        break;
      }
      //正式進入等待binlog同步的步驟,將rpl_semi_sync_master_wait_sessions+1
            //然後發起等待訊號,進入訊號等待後,只有2種情況可以退出等待。1是被其他執行緒喚醒(binlog dump)
            //2是等待超時時間。如果是被喚醒則返回值是0,否則是其他值
      rpl_semi_sync_master_wait_sessions++;
      
      if (trace_level_ & kTraceDetail)
        sql_print_information("%s: wait %lu ms for binlog sent (%s, %lu)",
                              kWho, wait_timeout_,
                              wait_file_name_, (unsigned long)wait_file_pos_);
      
      /* wait for the position to be ACK'ed back 
      實現 ACK 等待
      */
      assert(entry);
      entry->n_waiters++;
      // 第一個引數為條件量,第二個為等待之後釋放LOCK_binlog_互斥鎖,第三個為未來的超時絕對時間
      wait_result= mysql_cond_timedwait(&entry->cond, &LOCK_binlog_, &abstime);
      entry->n_waiters--;
      /*
        After we release LOCK_binlog_ above while waiting for the condition,
        it can happen that some other parallel client session executed
        RESET MASTER. That can set rpl_semi_sync_master_wait_sessions to zero.
        Hence check the value before decrementing it and decrement it only if it is
        non-zero value.
        在等待之後釋放 LOCK_binlog_互斥鎖, 有可能其他使用者端執行 RESET MASTER 命令, 這將把 rpl_semi_sync_master_wait_sessions 重置為 0。
        因此, 在遞減前需要檢查該值。
      */
      if (rpl_semi_sync_master_wait_sessions > 0)
        rpl_semi_sync_master_wait_sessions--;
      // wait_result != 0, 這裡表示等待超時
      if (wait_result != 0)
      {
        /* This is a real wait timeout. */
        sql_print_warning("Timeout waiting for reply of binlog (file: %s, pos: %lu), "
                          "semi-sync up to file %s, position %lu.",
                          trx_wait_binlog_name, (unsigned long)trx_wait_binlog_pos,
                          reply_file_name_, (unsigned long)reply_file_pos_);
        rpl_semi_sync_master_wait_timeouts++;
        
        /* switch semi-sync off ; 關閉 semi sync  */
        switch_off();
      }
      else
      // 等待 ACK 成功
      {
        int wait_time;
        
        wait_time = getWaitTime(start_ts);
        // wait_time < 0, 時鐘錯誤
        if (wait_time < 0)
        {
          if (trace_level_ & kTraceGeneral)
          {
            sql_print_information("Assessment of waiting time for commitTrx "
                                  "failed at wait position (%s, %lu)",
                                  trx_wait_binlog_name,
                                  (unsigned long)trx_wait_binlog_pos);
          }
          rpl_semi_sync_master_timefunc_fails++;
        }
        else
        {
          //將等待事件與該等待計入總數  
          rpl_semi_sync_master_trx_wait_num++;
          rpl_semi_sync_master_trx_wait_time += wait_time;
        }
      }
    }

l_end:
    /* Update the status counter. 
    更新狀態計數
    */
    if (is_on() && is_semi_sync_trans)
      rpl_semi_sync_master_yes_transactions++;
    else
      rpl_semi_sync_master_no_transactions++;
  }

  /* Last waiter removes the TranxNode 
  移除 active_tranxs_ 連結串列中 trx_wait_binlog_name & trx_wait_binlog_pos 之前的所有事務。
  */
  if (trx_wait_binlog_name && active_tranxs_
      && entry && entry->n_waiters == 0)
    active_tranxs_->clear_active_tranx_nodes(trx_wait_binlog_name,
                                             trx_wait_binlog_pos);

  unlock();
  THD_EXIT_COND(NULL, & old_stage);
  return function_exit(kWho, 0);
}

 

通過以上原始碼分析,可以看到在 after_sync hook 之後會釋放 Lock_commit 鎖,而後呼叫 after_commit hook。

因此當 AFTER_SYNC 時,會發現只有一個查詢執行緒處於 Waiting for semi-sync ACK from slave 狀態,其他查詢執行緒處於 query end 狀態。

而 AFTER_COMMIT 時,所有的查詢執行緒都處於 Waiting for semi-sync ACK from slave 狀態。