這是 Fall 2019 的最後一個實驗,要求我們實現預寫式紀錄檔、系統恢復和存檔點功能,這三個功能分別對應三個類 LogManager
、LogRecovery
和 CheckpointManager
,下面進入正題。
為了達到原子性和永續性的目標,資料庫系統會將描述事務所做修改的資訊儲存硬碟中。這些資訊確保已提交事務中執行的所有修改都反映在資料庫中,還可以確保系統崩潰並重新啟動後,由中止或失敗的事務所做的修改不會保留在資料庫中。本次實驗使用預寫紀錄檔記錄這些修改,預寫紀錄檔也是使用的最廣泛的記錄方式,基本原理如下圖所示。
在記憶體中有一塊緩衝區域 WAL Buffer,用於記錄任何事務中執行的操作,每當執行一個操作,就會在緩衝區中新增一條記錄,記錄的格式有三種:物理紀錄檔、邏輯紀錄檔和混合式紀錄檔。物理紀錄檔記錄了操作前後每個資料位的修改,邏輯紀錄檔只記錄了 SQL 語句,而混合紀錄檔和物理紀錄檔很像,不過將偏移量換成了槽號。邏輯紀錄檔存在一些問題,比如重新執行 Now()
時間會發生改變,而物理紀錄檔的偏移量也會有問題,如果對頁進行碎片整理會導致偏移量失效,所以實際上使用的是混合式的紀錄檔。
在 WAL Buffer 中新增完一條記錄後,才會修改緩衝池中的資料,當紀錄檔被提交或者 WAL Buffer 滿了之後(取決於具體策略),會將紀錄檔寫到硬碟上,雖然這時候緩衝池中的髒頁可能還沒同步到硬碟上,但是隻要儲存完紀錄檔,我們就能保證資料已經安全了。正是因為緩衝池未動,紀錄檔先行,所以這種策略稱為預寫式紀錄檔。
紀錄檔管理器 LogManager
的宣告如下所示,可以看到內部有兩個緩衝區: log_buffer_
和 flush_buffer_
,前者用於新增記錄,當滿足一定條件時(後面會說到),需要交換這兩個緩衝區的內容,然後使用 flush_thread_
將 flush_buffer_
寫到硬碟上:
class LogManager {
public:
explicit LogManager(DiskManager *disk_manager)
: next_lsn_(0), persistent_lsn_(INVALID_LSN), disk_manager_(disk_manager) {
log_buffer_ = new char[LOG_BUFFER_SIZE];
flush_buffer_ = new char[LOG_BUFFER_SIZE];
}
~LogManager() {
delete[] log_buffer_;
delete[] flush_buffer_;
log_buffer_ = nullptr;
flush_buffer_ = nullptr;
}
void RunFlushThread();
void StopFlushThread();
/* flush log to disk */
void Flush();
lsn_t AppendLogRecord(LogRecord *log_record);
inline lsn_t GetNextLSN() { return next_lsn_; }
inline lsn_t GetPersistentLSN() { return persistent_lsn_; }
inline void SetPersistentLSN(lsn_t lsn) { persistent_lsn_ = lsn; }
inline char *GetLogBuffer() { return log_buffer_; }
private:
/** The atomic counter which records the next log sequence number. */
std::atomic<lsn_t> next_lsn_;
/** The log records before and including the persistent lsn have been written to disk. */
std::atomic<lsn_t> persistent_lsn_;
char *log_buffer_;
char *flush_buffer_;
int log_buffer_offset_ = 0;
int flush_buffer_offset_ = 0;
std::mutex latch_;
std::thread *flush_thread_;
std::condition_variable cv_;
std::condition_variable cv_append_;
std::atomic_bool need_flush_ = false;
DiskManager *disk_manager_;
};
當滿足下述條件之一時,我們會使用紀錄檔執行緒將紀錄檔寫到硬碟上:
log_buffer_
的剩餘空間不足以插入新的記錄log_timeout
實驗提示說要用到 Future 和 Promise,但是感覺條件變數就夠用了,加上一個 need_flush_
判斷條件避免發生虛假喚醒:
void LogManager::RunFlushThread() {
if (enable_logging) {
return;
}
enable_logging = true;
flush_thread_ = new std::thread([&] {
while (enable_logging) {
std::unique_lock<std::mutex> lock(latch_);
// flush log to disk if log time out or log buffer is full
cv_.wait_for(lock, log_timeout, [&] { return need_flush_.load(); });
if (log_buffer_offset_ > 0) {
std::swap(log_buffer_, flush_buffer_);
std::swap(log_buffer_offset_, flush_buffer_offset_);
disk_manager_->WriteLog(flush_buffer_, flush_buffer_offset_);
flush_buffer_offset_ = 0;
SetPersistentLSN(next_lsn_ - 1);
}
need_flush_ = false;
cv_append_.notify_all();
}
});
}
當資料庫系統被關閉時,我們應該停止紀錄檔執行緒,同時將 log_buffer_
中的記錄全部儲存到硬碟中:
void LogManager::StopFlushThread() {
enable_logging = false;
Flush();
flush_thread_->join();
delete flush_thread_;
flush_thread_ = nullptr;
}
void LogManager::Flush() {
if (!enable_logging) {
return;
}
std::unique_lock<std::mutex> lock(latch_);
need_flush_ = true;
cv_.notify_one();
// block thread until flush finished
cv_append_.wait(lock, [&] { return !need_flush_.load(); });
}
根據執行操作的不同,紀錄檔記錄也分為多個種類:
enum class LogRecordType {
INVALID = 0,
INSERT,
MARKDELETE,
APPLYDELETE,
ROLLBACKDELETE,
UPDATE,
BEGIN,
COMMIT,
ABORT,
/** Creating a new page in the table heap. */
NEWPAGE,
};
紀錄檔記錄由 LogRecord
類描述,每一種記錄的格式如下所示:
Header (每種型別都擁有 Header,共 20 位元組)
--------------------------------------------
| size | LSN | transID | prevLSN | LogType |
--------------------------------------------
插入型別紀錄檔記錄
--------------------------------------------------------------
| HEADER | tuple_rid | tuple_size | tuple_data(char[] array) |
--------------------------------------------------------------
刪除型別紀錄檔記錄 (包括 markdelete, rollbackdelete, applydelete)
--------------------------------------------------------------
| HEADER | tuple_rid | tuple_size | tuple_data(char[] array) |
--------------------------------------------------------------
更新型別紀錄檔記錄
----------------------------------------------------------------------------------
| HEADER | tuple_rid | tuple_size | old_tuple_data | tuple_size | new_tuple_data |
----------------------------------------------------------------------------------
新頁型別紀錄檔記錄
-----------------------------------
| HEADER | prev_page_id | page_id |
-----------------------------------
我們需要根據不同型別紀錄檔記錄的格式將紀錄檔記錄序列化到 log_buffer_
中:
lsn_t LogManager::AppendLogRecord(LogRecord *log_record) {
std::unique_lock<std::mutex> lock(latch_);
// flush log to disk when the log buffer is full
if (log_record->size_ + log_buffer_offset_ > LOG_BUFFER_SIZE) {
// wake up flush thread to write log
need_flush_ = true;
cv_.notify_one();
// block current thread until log buffer is emptied
cv_append_.wait(lock, [&] { return log_record->size_ + log_buffer_offset_ <= LOG_BUFFER_SIZE; });
}
// serialize header
log_record->lsn_ = next_lsn_++;
memcpy(log_buffer_ + log_buffer_offset_, log_record, LogRecord::HEADER_SIZE);
int pos = log_buffer_offset_ + LogRecord::HEADER_SIZE;
// serialize body
switch (log_record->GetLogRecordType()) {
case LogRecordType::INSERT:
memcpy(log_buffer_ + pos, &log_record->insert_rid_, sizeof(RID));
pos += sizeof(RID);
log_record->insert_tuple_.SerializeTo(log_buffer_ + pos);
break;
case LogRecordType::MARKDELETE:
case LogRecordType::APPLYDELETE:
case LogRecordType::ROLLBACKDELETE:
memcpy(log_buffer_ + pos, &log_record->delete_rid_, sizeof(RID));
pos += sizeof(RID);
log_record->delete_tuple_.SerializeTo(log_buffer_ + pos);
break;
case LogRecordType::UPDATE:
memcpy(log_buffer_ + pos, &log_record->update_rid_, sizeof(RID));
pos += sizeof(RID);
log_record->old_tuple_.SerializeTo(log_buffer_ + pos);
pos += 4 + static_cast<int>(log_record->old_tuple_.GetLength());
log_record->new_tuple_.SerializeTo(log_buffer_ + pos);
break;
case LogRecordType::NEWPAGE:
memcpy(log_buffer_ + pos, &log_record->prev_page_id_, sizeof(page_id_t));
pos += sizeof(page_id_t);
memcpy(log_buffer_ + pos, &log_record->page_id_, sizeof(page_id_t));
break;
default:
break;
}
// update log buffer offset
log_buffer_offset_ += log_record->size_;
return log_record->lsn_;
}
在我們呼叫 TablePage::InsertTuple
等方法的時候,內部會呼叫 LogManager::AppendLogRecord
新增紀錄檔記錄,但是事務開始、提交或者終止時也需要我們新增記錄:
Transaction *TransactionManager::Begin(Transaction *txn) {
// Acquire the global transaction latch in shared mode.
global_txn_latch_.RLock();
if (txn == nullptr) {
txn = new Transaction(next_txn_id_++);
}
if (enable_logging) {
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::BEGIN);
auto lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
}
txn_map[txn->GetTransactionId()] = txn;
return txn;
}
void TransactionManager::Commit(Transaction *txn) {
txn->SetState(TransactionState::COMMITTED);
// 省略部分程式碼
if (enable_logging) {
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::COMMIT);
auto lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
}
// Release all the locks.
ReleaseLocks(txn);
// Release the global transaction latch.
global_txn_latch_.RUnlock();
}
void TransactionManager::Abort(Transaction *txn) {
txn->SetState(TransactionState::ABORTED);
// 省略部分程式碼
if (enable_logging) {
LogRecord log_record(txn->GetTransactionId(), txn->GetPrevLSN(), LogRecordType::ABORT);
auto lsn = log_manager_->AppendLogRecord(&log_record);
txn->SetPrevLSN(lsn);
}
// Release all the locks.
ReleaseLocks(txn);
// Release the global transaction latch.
global_txn_latch_.RUnlock();
}
如前所述,將緩衝池中的髒頁換出時需要強制儲存紀錄檔,在 Buffer Pool Manager 實驗中我們實現了一個 GetVictimFrameId
方法,只要略作修改即可:
frame_id_t BufferPoolManager::GetVictimFrameId() {
frame_id_t frame_id;
if (!free_list_.empty()) {
frame_id = free_list_.front();
free_list_.pop_front();
} else {
if (!replacer_->Victim(&frame_id)) {
return INVALID_PAGE_ID;
}
// flush log to disk when victim a dirty page
if (enable_logging) {
Page &page = pages_[frame_id];
if (page.IsDirty() && page.GetLSN() > log_manager_->GetPersistentLSN()) {
log_manager_->Flush();
}
}
}
return frame_id;
}
紀錄檔管理器的測試結果如下所示:
本次實驗使用的系統恢復策略較為簡單,由於沒有使用 Fuzzy Checkpoint,所以少了 Analysis
階段,直接變成在 LogRecovery::Redo
函數中分析出當前活躍事務表 ATT 並進行重放,在 LogRecovery::Undo
函數中進行回滾。
LogRecovery
會不斷呼叫 DiskManager::ReadLog
函數直到讀取完整個紀錄檔,由於我們先前將 LogRecord
進行了序列化,此處需要進行反序列化以存取記錄中的資訊。由於 log_buffer_
的大小為 LOG_BUFFER_SIZE
,所以將紀錄檔檔案讀取到 log_buffer_
的過程中可能截斷最後一條記錄,這時候需要返回 false
以表示反序列化失敗:
bool LogRecovery::DeserializeLogRecord(const char *data, LogRecord *log_record) {
// convert data to record and check header
auto record = reinterpret_cast<const LogRecord *>(data);
if (record->size_ <= 0 || data + record->size_ > log_buffer_ + LOG_BUFFER_SIZE) {
return false;
}
// copy header
memcpy(reinterpret_cast<char *>(log_record), data, LogRecord::HEADER_SIZE);
// copy body
int pos = LogRecord::HEADER_SIZE;
switch (log_record->GetLogRecordType()) {
case LogRecordType::INSERT:
memcpy(&log_record->insert_rid_, data + pos, sizeof(RID));
pos += sizeof(RID);
log_record->insert_tuple_.DeserializeFrom(data + pos);
break;
case LogRecordType::MARKDELETE:
case LogRecordType::APPLYDELETE:
case LogRecordType::ROLLBACKDELETE:
memcpy(&log_record->delete_rid_, data + pos, sizeof(RID));
pos += sizeof(RID);
log_record->delete_tuple_.DeserializeFrom(data + pos);
break;
case LogRecordType::UPDATE:
memcpy(&log_record->update_rid_, data + pos, sizeof(RID));
pos += sizeof(RID);
log_record->old_tuple_.DeserializeFrom(data + pos);
pos += 4 + log_record->old_tuple_.GetLength();
log_record->new_tuple_.DeserializeFrom(data + pos);
break;
case LogRecordType::NEWPAGE:
memcpy(&log_record->prev_page_id_, data + pos, sizeof(page_id_t));
pos += sizeof(page_id_t);
memcpy(&log_record->page_id_, data + pos, sizeof(page_id_t));
break;
case LogRecordType::BEGIN:
case LogRecordType::COMMIT:
case LogRecordType::ABORT:
break;
default:
return false;
}
return true;
}
重放過程十分簡單粗暴,遍歷整個紀錄檔的記錄,如果記錄的紀錄檔序號大於記錄操作的 tuple 儲存到磁碟上的紀錄檔序列號,說明 tuple 被修改後還沒儲存到磁碟上就宕機了,這時候需要進行回放。在遍歷的時候先無腦將記錄對應的事務新增到 ATT 中,直到事務被提交或者中斷才將其移出 ATT。
void LogRecovery::Redo() {
while (disk_manager_->ReadLog(log_buffer_, LOG_BUFFER_SIZE, offset_)) {
// offset of current log buffer
size_t pos = 0;
LogRecord log_record;
// deserialize log entry to record
while (DeserializeLogRecord(log_buffer_ + pos, &log_record)) {
// update lsn mapping
auto lsn = log_record.lsn_;
lsn_mapping_[lsn] = offset_ + pos;
// Add txn to ATT with status UNDO
active_txn_[log_record.txn_id_] = lsn;
pos += log_record.size_;
// redo if page was not wirtten to disk when crash happened
switch (log_record.log_record_type_) {
case LogRecordType::INSERT: {
auto page = getTablePage(log_record.insert_rid_);
if (page->GetLSN() < lsn) {
page->WLatch();
page->InsertTuple(log_record.insert_tuple_, &log_record.insert_rid_, nullptr, nullptr, nullptr);
page->WUnlatch();
}
buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
break;
}
case LogRecordType::UPDATE: {
auto page = getTablePage(log_record.update_rid_);
if (page->GetLSN() < lsn) {
page->WLatch();
page->UpdateTuple(log_record.new_tuple_, &log_record.old_tuple_, log_record.update_rid_, nullptr, nullptr,
nullptr);
page->WUnlatch();
}
buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
break;
}
case LogRecordType::MARKDELETE:
case LogRecordType::APPLYDELETE:
case LogRecordType::ROLLBACKDELETE: {
auto page = getTablePage(log_record.delete_rid_);
if (page->GetLSN() < lsn) {
page->WLatch();
if (log_record.log_record_type_ == LogRecordType::MARKDELETE) {
page->MarkDelete(log_record.delete_rid_, nullptr, nullptr, nullptr);
} else if (log_record.log_record_type_ == LogRecordType::APPLYDELETE) {
page->ApplyDelete(log_record.delete_rid_, nullptr, nullptr);
} else {
page->RollbackDelete(log_record.delete_rid_, nullptr, nullptr);
}
page->WUnlatch();
}
buffer_pool_manager_->UnpinPage(page->GetPageId(), page->GetLSN() < lsn);
break;
}
case LogRecordType::COMMIT:
case LogRecordType::ABORT:
active_txn_.erase(log_record.txn_id_);
break;
case LogRecordType::NEWPAGE: {
auto page_id = log_record.page_id_;
auto page = getTablePage(page_id);
if (page->GetLSN() < lsn) {
auto prev_page_id = log_record.prev_page_id_;
page->WLatch();
page->Init(page_id, PAGE_SIZE, prev_page_id, nullptr, nullptr);
page->WUnlatch();
if (prev_page_id != INVALID_PAGE_ID) {
auto prev_page = getTablePage(prev_page_id);
if (prev_page->GetNextPageId() != page_id) {
prev_page->SetNextPageId(page_id);
buffer_pool_manager_->UnpinPage(prev_page_id, true);
} else {
buffer_pool_manager_->UnpinPage(prev_page_id, false);
}
}
}
buffer_pool_manager_->UnpinPage(page_id, page->GetLSN() < lsn);
break;
}
default:
break;
}
}
offset_ += pos;
}
}
LogRecovery::Undo
會遍歷 ATT 中的每一個事務,對事務的操作進行回滾,回滾的規則如下:
LogRecordType::INSERT
,使用 TablePage::ApplyDelete
進行回滾LogRecordType::UPDATE
,使用 TablePage::UpdateTuple
進行回滾LogRecordType::APPLYDELETE
,使用 TablePage::InsertTuple
進行回滾LogRecordType::MARKDELETE
,使用 TablePage::RollbackDelete
進行回滾LogRecordType::ROLLBACKDELETE
,使用 TablePage::MarkDelete
進行回滾void LogRecovery::Undo() {
for (auto [txn_id, lsn] : active_txn_) {
while (lsn != INVALID_LSN) {
// read log from dist and convert log buffer entry to log record
LogRecord log_record;
auto offset = lsn_mapping_[lsn];
disk_manager_->ReadLog(log_buffer_, LOG_BUFFER_SIZE, offset);
DeserializeLogRecord(log_buffer_, &log_record);
lsn = log_record.GetPrevLSN();
// rollback
switch (log_record.GetLogRecordType()) {
case LogRecordType::INSERT: {
auto page = getTablePage(log_record.insert_rid_);
page->WLatch();
page->ApplyDelete(log_record.insert_rid_, nullptr, nullptr);
page->WUnlatch();
buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
break;
}
case LogRecordType::UPDATE: {
auto page = getTablePage(log_record.update_rid_);
page->WLatch();
page->UpdateTuple(log_record.old_tuple_, &log_record.new_tuple_, log_record.update_rid_, nullptr, nullptr,
nullptr);
page->WUnlatch();
buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
break;
}
case LogRecordType::MARKDELETE:
case LogRecordType::APPLYDELETE:
case LogRecordType::ROLLBACKDELETE: {
auto page = getTablePage(log_record.delete_rid_);
page->WLatch();
if (log_record.log_record_type_ == LogRecordType::MARKDELETE) {
page->RollbackDelete(log_record.delete_rid_, nullptr, nullptr);
} else if (log_record.log_record_type_ == LogRecordType::APPLYDELETE) {
page->InsertTuple(log_record.delete_tuple_, &log_record.delete_rid_, nullptr, nullptr, nullptr);
} else {
page->MarkDelete(log_record.delete_rid_, nullptr, nullptr, nullptr);
}
page->WUnlatch();
buffer_pool_manager_->UnpinPage(page->GetPageId(), true);
break;
}
default:
break;
}
}
}
active_txn_.clear();
lsn_mapping_.clear();
}
存檔點管理器用於儲存紀錄檔並刷出緩衝池中所有的髒頁,同時阻塞正在進行的事務。
void CheckpointManager::BeginCheckpoint() {
transaction_manager_->BlockAllTransactions();
log_manager_->Flush();
buffer_pool_manager_->FlushAllPages();
}
void CheckpointManager::EndCheckpoint() {
transaction_manager_->ResumeTransactions();
}
紀錄檔恢復和存檔點的測試結果如下:
本次實驗主要考察對預寫式紀錄檔和資料庫系統恢復的理解,程式碼上層面多了對多執行緒同步技術的要求,整個實驗做下來感覺比較順(除了被段錯誤坑了億點時間外),以上~~