CMU15445 (Fall 2019) 之 Project#4

2022-07-16 21:00:56

前言

這是 Fall 2019 的最後一個實驗,要求我們實現預寫式紀錄檔、系統恢復和存檔點功能,這三個功能分別對應三個類 LogManagerLogRecoveryCheckpointManager,下面進入正題。

程式碼實現

紀錄檔管理器

為了達到原子性和永續性的目標,資料庫系統會將描述事務所做修改的資訊儲存硬碟中。這些資訊確保已提交事務中執行的所有修改都反映在資料庫中,還可以確保系統崩潰並重新啟動後,由中止或失敗的事務所做的修改不會保留在資料庫中。本次實驗使用預寫紀錄檔記錄這些修改,預寫紀錄檔也是使用的最廣泛的記錄方式,基本原理如下圖所示。

在記憶體中有一塊緩衝區域 WAL Buffer,用於記錄任何事務中執行的操作,每當執行一個操作,就會在緩衝區中新增一條記錄,記錄的格式有三種:物理紀錄檔、邏輯紀錄檔和混合式紀錄檔。物理紀錄檔記錄了操作前後每個資料位的修改,邏輯紀錄檔只記錄了 SQL 語句,而混合紀錄檔和物理紀錄檔很像,不過將偏移量換成了槽號。邏輯紀錄檔存在一些問題,比如重新執行 Now() 時間會發生改變,而物理紀錄檔的偏移量也會有問題,如果對頁進行碎片整理會導致偏移量失效,所以實際上使用的是混合式的紀錄檔。

在 WAL Buffer 中新增完一條記錄後,才會修改緩衝池中的資料,當紀錄檔被提交或者 WAL Buffer 滿了之後(取決於具體策略),會將紀錄檔寫到硬碟上,雖然這時候緩衝池中的髒頁可能還沒同步到硬碟上,但是隻要儲存完紀錄檔,我們就能保證資料已經安全了。正是因為緩衝池未動,紀錄檔先行,所以這種策略稱為預寫式紀錄檔。

紀錄檔管理器 LogManager 的宣告如下所示,可以看到內部有兩個緩衝區: log_buffer_flush_buffer_,前者用於新增記錄,當滿足一定條件時(後面會說到),需要交換這兩個緩衝區的內容,然後使用 flush_thread_flush_buffer_ 寫到硬碟上:

class LogManager {
 public:
  explicit LogManager(DiskManager *disk_manager)
      : next_lsn_(0), persistent_lsn_(INVALID_LSN), disk_manager_(disk_manager) {
    log_buffer_ = new char[LOG_BUFFER_SIZE];
    flush_buffer_ = new char[LOG_BUFFER_SIZE];
  }

  ~LogManager() {
    delete[] log_buffer_;
    delete[] flush_buffer_;
    log_buffer_ = nullptr;
    flush_buffer_ = nullptr;
  }

  void RunFlushThread();
  void StopFlushThread();

  /* flush log to disk */
  void Flush();

  lsn_t AppendLogRecord(LogRecord *log_record);

  inline lsn_t GetNextLSN() { return next_lsn_; }
  inline lsn_t GetPersistentLSN() { return persistent_lsn_; }
  inline void SetPersistentLSN(lsn_t lsn) { persistent_lsn_ = lsn; }
  inline char *GetLogBuffer() { return log_buffer_; }

 private:
  /** The atomic counter which records the next log sequence number. */
  std::atomic<lsn_t> next_lsn_;
  /** The log records before and including the persistent lsn have been written to disk. */
  std::atomic<lsn_t> persistent_lsn_;

  char *log_buffer_;
  char *flush_buffer_;

  int log_buffer_offset_ = 0;
  int flush_buffer_offset_ = 0;

  std::mutex latch_;

  std::thread *flush_thread_;

  std::condition_variable cv_;
  std::condition_variable cv_append_;

  std::atomic_bool need_flush_ = false;

  DiskManager *disk_manager_;
};

啟動紀錄檔執行緒

當滿足下述條件之一時,我們會使用紀錄檔執行緒將紀錄檔寫到硬碟上:

  • log_buffer_ 的剩餘空間不足以插入新的記錄
  • 距離上一次儲存紀錄檔的時間超過了 log_timeout
  • 緩衝池換出了一個髒頁

實驗提示說要用到 Future 和 Promise,但是感覺條件變數就夠用了,加上一個 need_flush_ 判斷條件避免發生虛假喚醒:

void LogManager::RunFlushThread() {
  if (enable_logging) {
    return;
  }

  enable_logging = true;
  flush_thread_ = new std::thread([&] {
    while (enable_logging) {
      std::unique_lock<std::mutex> lock(latch_);

      // flush log to disk if log time out or log buffer is full
      cv_.wait_for(lock, log_timeout, [&] { return need_flush_.load(); });
      if (log_buffer_offset_ > 0) {
        std::swap(log_buffer_, flush_buffer_);
        std::swap(log_buffer_offset_, flush_buffer_offset_);
        disk_manager_->WriteLog(flush_buffer_, flush_buffer_offset_);
        flush_buffer_offset_ = 0;
        SetPersistentLSN(next_lsn_ - 1);
      }

      need_flush_ = false;
      cv_append_.notify_all();
    }
  });
}

停止紀錄檔執行緒

當資料庫系統被關閉時,我們應該停止紀錄檔執行緒,同時將 log_buffer_ 中的記錄全部儲存到硬碟中:

void LogManager::StopFlushThread() {
  enable_logging = false;
  Flush();
  flush_thread_->join();
  delete flush_thread_;
  flush_thread_ = nullptr;
}

void LogManager::Flush() {
  if (!enable_logging) {
    return;
  }

  std::unique_lock<std::mutex> lock(latch_);
  need_flush_ = true;
  cv_.notify_one();

  // block thread until flush finished
  cv_append_.wait(lock, [&] { return !need_flush_.load(); });
}

新增紀錄檔記錄

根據執行操作的不同,紀錄檔記錄也分為多個種類:

enum class LogRecordType {
  INVALID = 0,
  INSERT,
  MARKDELETE,
  APPLYDELETE,
  ROLLBACKDELETE,
  UPDATE,
  BEGIN,
  COMMIT,
  ABORT,
  /** Creating a new page in the table heap. */
  NEWPAGE,
};

紀錄檔記錄由 LogRecord 類描述,每一種記錄的格式如下所示:

 Header (每種型別都擁有 Header,共 20 位元組)
 --------------------------------------------
 | size | LSN | transID | prevLSN | LogType |
 --------------------------------------------

 插入型別紀錄檔記錄
 --------------------------------------------------------------
 | HEADER | tuple_rid | tuple_size | tuple_data(char[] array) |
 --------------------------------------------------------------

 刪除型別紀錄檔記錄 (包括 markdelete, rollbackdelete, applydelete)
 --------------------------------------------------------------
 | HEADER | tuple_rid | tuple_size | tuple_data(char[] array) |
 --------------------------------------------------------------

 更新型別紀錄檔記錄
 ----------------------------------------------------------------------------------
 | HEADER | tuple_rid | tuple_size | old_tuple_data | tuple_size | new_tuple_data |
 ----------------------------------------------------------------------------------

 新頁型別紀錄檔記錄
 -----------------------------------
 | HEADER | prev_page_id | page_id |
 -----------------------------------

我們需要根據不同型別紀錄檔記錄的格式將紀錄檔記錄序列化到 log_buffer_ 中:

lsn_t LogManager::AppendLogRecord(LogRecord *log_record) {
  std::unique_lock<std::mutex> lock(latch_);

  // flush log to disk when the log buffer is full
  if (log_record->size_ + log_buffer_offset_ > LOG_BUFFER_SIZE) {
    // wake up flush thread to write log
    need_flush_ = true;
    cv_.notify_one();

    // block current thread until log buffer is emptied
    cv_append_.wait(lock, [&] { return log_record->size_ + log_buffer_offset_ <= LOG_BUFFER_SIZE; });
  }

  // serialize header
  log_record->lsn_ = next_lsn_++;
  memcpy(log_buffer_ + log_buffer_offset_, log_record, LogRecord::HEADER_SIZE);
  int pos = log_buffer_offset_ + LogRecord::HEADER_SIZE;

  // serialize body
  switch (log_record->GetLogRecordType()) {
    case LogRecordType::INSERT:
      memcpy(log_buffer_ + pos, &log_record->insert_rid_, sizeof(RID));
      pos += sizeof(RID);
      log_record->insert_tuple_.SerializeTo(log_buffer_ + pos);
      break;

    case LogRecordType::MARKDELETE:
    case LogRecordType::APPLYDELETE:
    case LogRecordType::ROLLBACKDELETE:
      memcpy(log_buffer_ + pos, &log_record->delete_rid_, sizeof(RID));
      pos += sizeof(RID);
      log_record->delete_tuple_.SerializeTo(log_buffer_ + pos);
      break;

    case LogRecordType::UPDATE:
      memcpy(log_buffer_ + pos, &log_record->update_rid_, sizeof(RID));
      pos += sizeof(RID);
      log_record->old_tuple_.SerializeTo(log_buffer_ + pos);
      pos += 4 + static_cast<int>(log_record->old_tuple_.GetLength());
      log_record->new_tuple_.SerializeTo(log_buffer_ + pos);
      break;

    case LogRecordType::NEWPAGE:
      memcpy(log_buffer_ + pos, &log_record->prev_page_id_, sizeof(page_id_t));
      pos += sizeof(page_id_t);
      memcpy(log_buffer_ + pos, &log_record->page_id_, sizeof(page_id_t));
      break;

    default:
      break;
  }

  // update log buffer offset
  log_buffer_offset_ += log_record->size_;
  return log_record->lsn_;
}

事務管理器和緩衝池管理器

在我們呼叫 TablePage::InsertTuple 等方法的時候,內部會呼叫 LogManager::AppendLogRecord 新增紀錄檔記錄,但是事務開始、提交或者終止時也需要我們新增記錄:

Transaction *TransactionManager::Begin(Transaction *txn) {
  // Acquire the global transaction latch in shared mode.
  global_txn_latch_.RLock();

  if (txn == nullptr) {
    txn = new Transaction(next_txn_id_++);
  }

  if (enable_logging) {
    LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::BEGIN);
    auto lsn = log_manager_->AppendLogRecord(&log_record);
    txn->SetPrevLSN(lsn);
  }

  txn_map[txn->GetTransactionId()] = txn;
  return txn;
}

void TransactionManager::Commit(Transaction *txn) {
  txn->SetState(TransactionState::COMMITTED);

  // 省略部分程式碼
  if (enable_logging) {
    LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::COMMIT);
    auto lsn = log_manager_->AppendLogRecord(&log_record);
    txn->SetPrevLSN(lsn);
  }

  // Release all the locks.
  ReleaseLocks(txn);
  // Release the global transaction latch.
  global_txn_latch_.RUnlock();
}

void TransactionManager::Abort(Transaction *txn) {
  txn->SetState(TransactionState::ABORTED);

  // 省略部分程式碼
  if (enable_logging) {
    LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::ABORT);
    auto lsn = log_manager_->AppendLogRecord(&log_record);
    txn->SetPrevLSN(lsn);
  }

  // Release all the locks.
  ReleaseLocks(txn);
  // Release the global transaction latch.
  global_txn_latch_.RUnlock();
}

如前所述,將緩衝池中的髒頁換出時需要強制儲存紀錄檔,在 Buffer Pool Manager 實驗中我們實現了一個 GetVictimFrameId 方法,只要略作修改即可:

frame_id_t BufferPoolManager::GetVictimFrameId() {
  frame_id_t frame_id;

  if (!free_list_.empty()) {
    frame_id = free_list_.front();
    free_list_.pop_front();
  } else {
    if (!replacer_->Victim(&frame_id)) {
      return INVALID_PAGE_ID;
    }

    // flush log to disk when victim a dirty page
    if (enable_logging) {
      Page &page = pages_[frame_id];
      if (page.IsDirty() && page.GetLSN() > log_manager_->GetPersistentLSN()) {
        log_manager_->Flush();
      }
    }
  }

  return frame_id;
}

測試

紀錄檔管理器的測試結果如下所示:

系統恢復

本次實驗使用的系統恢復策略較為簡單,由於沒有使用 Fuzzy Checkpoint,所以少了 Analysis 階段,直接變成在 LogRecovery::Redo 函數中分析出當前活躍事務表 ATT 並進行重放,在 LogRecovery::Undo 函數中進行回滾。

紀錄檔記錄反序列化

LogRecovery 會不斷呼叫 DiskManager::ReadLog 函數直到讀取完整個紀錄檔,由於我們先前將 LogRecord 進行了序列化,此處需要進行反序列化以存取記錄中的資訊。由於 log_buffer_ 的大小為 LOG_BUFFER_SIZE,所以將紀錄檔檔案讀取到 log_buffer_ 的過程中可能截斷最後一條記錄,這時候需要返回 false 以表示反序列化失敗:

bool LogRecovery::DeserializeLogRecord(const char *data, LogRecord *log_record) {
  // convert data to record and check header
  auto record = reinterpret_cast<const LogRecord *>(data);
  if (record->size_ <= 0 || data + record->size_ > log_buffer_ + LOG_BUFFER_SIZE) {
    return false;
  }

  // copy header
  memcpy(reinterpret_cast<char *>(log_record), data, LogRecord::HEADER_SIZE);

  // copy body
  int pos = LogRecord::HEADER_SIZE;
  switch (log_record->GetLogRecordType()) {
    case LogRecordType::INSERT:
      memcpy(&log_record->insert_rid_, data + pos, sizeof(RID));
      pos += sizeof(RID);
      log_record->insert_tuple_.DeserializeFrom(data + pos);
      break;

    case LogRecordType::MARKDELETE:
    case LogRecordType::APPLYDELETE:
    case LogRecordType::ROLLBACKDELETE:
      memcpy(&log_record->delete_rid_, data + pos, sizeof(RID));
      pos += sizeof(RID);
      log_record->delete_tuple_.DeserializeFrom(data + pos);
      break;

    case LogRecordType::UPDATE:
      memcpy(&log_record->update_rid_, data + pos, sizeof(RID));
      pos += sizeof(RID);
      log_record->old_tuple_.DeserializeFrom(data + pos);
      pos += 4 + log_record->old_tuple_.GetLength();
      log_record->new_tuple_.DeserializeFrom(data + pos);
      break;

    case LogRecordType::NEWPAGE:
      memcpy(&log_record->prev_page_id_, data + pos, sizeof(page_id_t));
      pos += sizeof(page_id_t);
      memcpy(&log_record->page_id_, data + pos, sizeof(page_id_t));
      break;

    case LogRecordType::BEGIN:
    case LogRecordType::COMMIT:
    case LogRecordType::ABORT:
      break;

    default:
      return false;
  }

  return true;
}

重放

重放過程十分簡單粗暴,遍歷整個紀錄檔的記錄,如果記錄的紀錄檔序號大於記錄操作的 tuple 儲存到磁碟上的紀錄檔序列號,說明 tuple 被修改後還沒儲存到磁碟上就宕機了,這時候需要進行回放。在遍歷的時候先無腦將記錄對應的事務新增到 ATT 中,直到事務被提交或者中斷才將其移出 ATT。

void LogRecovery::Redo() {
  while (disk_manager_->ReadLog(log_buffer_, LOG_BUFFER_SIZE, offset_)) {
    // offset of current log buffer
    size_t pos = 0;
    LogRecord log_record;

    // deserialize log entry to record
    while (DeserializeLogRecord(log_buffer_ + pos, &log_record)) {
      // update lsn mapping
      auto lsn = log_record.lsn_;
      lsn_mapping_[lsn] = offset_ + pos;

      // Add txn to ATT with status UNDO
      active_txn_[log_record.txn_id_] = lsn;
      pos += log_record.size_;

      // redo if page was not wirtten to disk when crash happened
      switch (log_record.log_record_type_) {
        case LogRecordType::INSERT: {
          auto page = getTablePage(log_record.insert_rid_);
          if (page->GetLSN() < lsn) {
            page->WLatch();
            page->InsertTuple(log_record.insert_tuple_, &log_record.insert_rid_, nullptr, nullptr, nullptr);
            page->WUnlatch();
          }

          buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
          break;
        }

        case LogRecordType::UPDATE: {
          auto page = getTablePage(log_record.update_rid_);
          if (page->GetLSN() < lsn) {
            page->WLatch();
            page->UpdateTuple(log_record.new_tuple_, &log_record.old_tuple_, log_record.update_rid_, nullptr, nullptr,
                              nullptr);
            page->WUnlatch();
          }

          buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
          break;
        }

        case LogRecordType::MARKDELETE:
        case LogRecordType::APPLYDELETE:
        case LogRecordType::ROLLBACKDELETE: {
          auto page = getTablePage(log_record.delete_rid_);
          if (page->GetLSN() < lsn) {
            page->WLatch();
            if (log_record.log_record_type_ == LogRecordType::MARKDELETE) {
              page->MarkDelete(log_record.delete_rid_, nullptr, nullptr, nullptr);
            } else if (log_record.log_record_type_ == LogRecordType::APPLYDELETE) {
              page->ApplyDelete(log_record.delete_rid_, nullptr, nullptr);
            } else {
              page->RollbackDelete(log_record.delete_rid_, nullptr, nullptr);
            }
            page->WUnlatch();
          }

          buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
          break;
        }

        case LogRecordType::COMMIT:
        case LogRecordType::ABORT:
          active_txn_.erase(log_record.txn_id_);
          break;

        case LogRecordType::NEWPAGE: {
          auto page_id = log_record.page_id_;
          auto page = getTablePage(page_id);
          if (page->GetLSN() < lsn) {
            auto prev_page_id = log_record.prev_page_id_;
            page->WLatch();
            page->Init(page_id, PAGE_SIZE, prev_page_id, nullptr, nullptr);
            page->WUnlatch();

            if (prev_page_id != INVALID_PAGE_ID) {
              auto prev_page = getTablePage(prev_page_id);
              if (prev_page->GetNextPageId() != page_id) {
                prev_page->SetNextPageId(page_id);
                buffer_pool_manager_->UnpinPage(prev_page_id, true);
              } else {
                buffer_pool_manager_->UnpinPage(prev_page_id, false);
              }
            }
          }

          buffer_pool_manager_->UnpinPage(page_id, page->GetLSN() < lsn);
          break;
        }

        default:
          break;
      }
    }

    offset_ += pos;
  }
}

回滾

LogRecovery::Undo 會遍歷 ATT 中的每一個事務,對事務的操作進行回滾,回滾的規則如下:

  • 如果紀錄檔記錄型別為 LogRecordType::INSERT,使用 TablePage::ApplyDelete 進行回滾
  • 如果紀錄檔記錄型別為 LogRecordType::UPDATE,使用 TablePage::UpdateTuple 進行回滾
  • 如果紀錄檔記錄型別為 LogRecordType::APPLYDELETE,使用 TablePage::InsertTuple 進行回滾
  • 如果紀錄檔記錄型別為 LogRecordType::MARKDELETE,使用 TablePage::RollbackDelete 進行回滾
  • 如果紀錄檔記錄型別為 LogRecordType::ROLLBACKDELETE,使用 TablePage::MarkDelete 進行回滾
void LogRecovery::Undo() {
  for (auto [txn_id, lsn] : active_txn_) {
    while (lsn != INVALID_LSN) {
      // read log from dist and convert log buffer entry to log record
      LogRecord log_record;
      auto offset = lsn_mapping_[lsn];
      disk_manager_->ReadLog(log_buffer_, LOG_BUFFER_SIZE, offset);
      DeserializeLogRecord(log_buffer_, &log_record);
      lsn = log_record.GetPrevLSN();

      // rollback
      switch (log_record.GetLogRecordType()) {
        case LogRecordType::INSERT: {
          auto page = getTablePage(log_record.insert_rid_);
          page->WLatch();
          page->ApplyDelete(log_record.insert_rid_, nullptr, nullptr);
          page->WUnlatch();
          buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
          break;
        }

        case LogRecordType::UPDATE: {
          auto page = getTablePage(log_record.update_rid_);
          page->WLatch();
          page->UpdateTuple(log_record.old_tuple_, &log_record.new_tuple_, log_record.update_rid_, nullptr, nullptr,
                            nullptr);
          page->WUnlatch();
          buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
          break;
        }

        case LogRecordType::MARKDELETE:
        case LogRecordType::APPLYDELETE:
        case LogRecordType::ROLLBACKDELETE: {
          auto page = getTablePage(log_record.delete_rid_);
          page->WLatch();
          if (log_record.log_record_type_ == LogRecordType::MARKDELETE) {
            page->RollbackDelete(log_record.delete_rid_, nullptr, nullptr);
          } else if (log_record.log_record_type_ == LogRecordType::APPLYDELETE) {
            page->InsertTuple(log_record.delete_tuple_, &log_record.delete_rid_, nullptr, nullptr, nullptr);
          } else {
            page->MarkDelete(log_record.delete_rid_, nullptr, nullptr, nullptr);
          }
          page->WUnlatch();
          buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
          break;
        }

        default:
          break;
      }
    }
  }

  active_txn_.clear();
  lsn_mapping_.clear();
}

存檔點管理器

存檔點管理器用於儲存紀錄檔並刷出緩衝池中所有的髒頁,同時阻塞正在進行的事務。

void CheckpointManager::BeginCheckpoint() {
  transaction_manager_->BlockAllTransactions();
  log_manager_->Flush();
  buffer_pool_manager_->FlushAllPages();
}

void CheckpointManager::EndCheckpoint() {
  transaction_manager_->ResumeTransactions();
}

測試

紀錄檔恢復和存檔點的測試結果如下:

總結

本次實驗主要考察對預寫式紀錄檔和資料庫系統恢復的理解,程式碼上層面多了對多執行緒同步技術的要求,整個實驗做下來感覺比較順(除了被段錯誤坑了億點時間外),以上~~