一個合格的事務處理系統,應該具備四個性質:原子性(atomicity)、一致性(consistency)、隔離性(isolation)和永續性(durability)。隔離性保證了一個活躍的事務(還沒提交或者回滾)對資料庫所做的系統對於其他的活躍事務是不可見的,看起來就像某一時刻就只有一個事務在運算元據庫。然而完美的隔離性會導致資料庫系統並行效能的下降,有些時候我們可以容忍資料的不一致性,所以可以犧牲一部分隔離性,來換取更好的並行性。這篇部落格將介紹 CMU15-445 Fall2020 第四個實驗 Concurrency Control 的實現過程,使用兩階段鎖機制實現讀未提交、讀已提交和可重複讀這三種隔離級別,同時使用等待圖來定期檢測死鎖。
可序列化是最高等級的隔離級別,如果一種事務排程是可序列化的,那麼最終的效果等同於將多事務序列執行。如果兩個事務都只對資料庫進行讀操作,就不會有衝突問題,不管怎麼排程,都不會有一致性問題。但是一旦有一個事務有寫操作,就會產生衝突。
對於下圖所示的排程,可以看到裡面存在衝突,T1 和 T2 中都存在讀寫操作。由於 R(B)
和 W(A)
、W(B)
和 R(A)
互不影響,我們可以通過調整他們的順序,實現和序列化相同的效果,這時候稱為衝突可序列化。
調整之後的排程如下圖所示,最終的效果就是 T2 寫了記錄 A 和 B。
但是有些排程,不管你如何對調順序,也無法序列化。
如果無法通過排程實現衝突序列化,我們就只能回滾事務,這樣就做了很多的無用功。一種鑑定衝突是否可序列化的方式是根據排程構造出一個優先圖,每個事務是圖中的頂點,圖中的邊代表一種衝突操作。下圖存在 W-R 和 R-W 衝突,構造出來的優先圖是有環的,所以無法序列化。
雖然構造優先圖的方法很好使,但是這要求我們事先知道整個排程長什麼樣子。為了解決此問題,兩階段鎖協定橫空出世。該協定要求每個事務分為兩個階段提出鎖請求:
最初事務處於增長階段,根據需要獲得鎖,一旦釋放鎖就進入縮減階段。假設下圖中 T1 在 R(A)
之前加的是讀鎖,那麼在 R(A)
之後不能通過釋放讀鎖的方式來重新獲取寫鎖,只能通過鎖升級的方式將讀鎖轉換為寫鎖。
雖然兩階段鎖可以實現衝突可序列化,但是他無法避免髒讀和級聯回滾問題,這時候需要使用強兩階段鎖協定,只在 Commit 之後一次性釋放事務持有的所有鎖。
為確保事務操作正確交錯,資料庫管理系統將使用鎖管理器來控制何時允許事務存取資料項。鎖管理器的基本思想是維護一個關於當前被活動事務所持有的鎖的內部資料結構,事務在允許存取資料項之前向鎖管理器發出鎖請求。鎖管理器將要麼將鎖授予呼叫事務,要麼阻塞該事務,要麼將其中止。
Bustub 中使用的資料結構是鎖表 unordered_map<RID, LockRequestQueue> lock_table_
,如下圖所示。鎖表實際上是一個雜湊表,鍵為 tuple 的行 ID,值為一個連結串列,記錄了所有對此 tuple 發出加鎖請求的事務。深藍色的事務持有鎖,淺藍色的事務處於阻塞狀態。
鎖管理器處理鎖請求的機制為:
可以看到,這個機制和讀寫鎖是一樣的,可以保證鎖請求不會發生餓死現象。
LockRequestQueue
的定義如下所示,我們在 LockRequestQueue
中新增了 reader_count_
和 writer_enter_
成員,搭配條件變數 cv_
和鎖管理器中的 std::mutex latch_
可以實現讀寫鎖:
enum class LockMode { SHARED, EXCLUSIVE };
class LockRequest {
public:
LockRequest(txn_id_t txn_id, LockMode lock_mode) : txn_id_(txn_id), lock_mode_(lock_mode), granted_(false) {}
txn_id_t txn_id_;
LockMode lock_mode_;
bool granted_;
};
class LockRequestQueue {
public:
std::list<LockRequest> request_queue_;
std::condition_variable cv_; // for notifying blocked transactions on this rid
bool upgrading_ = false;
uint32_t reader_count_ = 0;
bool writer_enter_ = false;
};
實驗要求我們實現三種隔離級別:
LockShared()
的時候需要丟擲異常對於加共用鎖的請求,如果前面有排它鎖請求,就需要阻塞當前事務,直到持有排它鎖的事務釋放鎖。
bool LockManager::LockShared(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
// 收縮階段不允許上鎖
CheckShrinking(txn);
// 不需要重複上鎖
if (txn->IsSharedLocked(rid)) {
return true;
}
auto txn_id = txn->GetTransactionId();
// 讀未提交不需要加讀鎖
if (txn->GetIsolationLevel() == IsolationLevel::READ_UNCOMMITTED) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn_id, AbortReason::LOCKSHARED_ON_READ_UNCOMMITTED);
}
// 建立一個加鎖請求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn_id, LockMode::SHARED);
// 沒拿到鎖就進入阻塞狀態
queue.cv_.wait(lock, [&] { return !queue.writer_enter_ || txn->IsAborted(); });
// 死鎖會導致事務中止
CheckAborted(txn);
// 更新鎖請求的狀態
queue.reader_count_++;
request.granted_ = true;
txn->GetSharedLockSet()->emplace(rid);
return true;
}
void LockManager::CheckShrinking(Transaction *txn) {
if (txn->GetState() == TransactionState::SHRINKING) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::LOCK_ON_SHRINKING);
}
}
void LockManager::CheckAborted(Transaction *txn) {
if (txn->IsAborted()) {
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::DEADLOCK);
}
}
對於加排它鎖的請求,需要等待前方的寫請求和讀請求完成才能拿到鎖:
bool LockManager::LockExclusive(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
CheckShrinking(txn);
if (txn->IsExclusiveLocked(rid)) {
return true;
}
// 建立加鎖請求
auto &queue = lock_table_[rid];
auto &request = queue.request_queue_.emplace_back(txn->GetTransactionId(), LockMode::EXCLUSIVE);
// 沒有拿到寫鎖就進入阻塞狀態
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死鎖會導致事務中止
CheckAborted(txn);
queue.writer_enter_ = true;
request.granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
如果前方有事務提出升級鎖的請求,需要丟擲異常並中止當前事務,否則得等到當前請求變成請求佇列的第一個請求時才能完成升級操作:
bool LockManager::LockUpgrade(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
auto &queue = lock_table_[rid];
queue.reader_count_--;
auto request_it = GetRequest(txn->GetTransactionId(), rid);
request_it->lock_mode_ = LockMode::EXCLUSIVE;
request_it->granted_ = false;
// 如果前面有正在排隊升級鎖的事務就直接返回
if (queue.upgrading_) {
txn->SetState(TransactionState::ABORTED);
throw TransactionAbortException(txn->GetTransactionId(), AbortReason::UPGRADE_CONFLICT);
}
queue.upgrading_ = true;
queue.cv_.wait(lock, [&] { return (!queue.writer_enter_ && queue.reader_count_ == 0) || txn->IsAborted(); });
// 死鎖會導致事務中止
CheckAborted(txn);
queue.upgrading_ = false;
queue.writer_enter_ = true;
request_it->granted_ = true;
txn->GetExclusiveLockSet()->emplace(rid);
return true;
}
釋放鎖之後需要及時喚醒其他被阻塞的事務:
bool LockManager::Unlock(Transaction *txn, const RID &rid) {
std::unique_lock<std::mutex> lock(latch_);
txn->GetSharedLockSet()->erase(rid);
txn->GetExclusiveLockSet()->erase(rid);
auto request_it = GetRequest(txn->GetTransactionId(), rid);
auto lock_mode = request_it->lock_mode_;
// 更新事務狀態,讀已提交不需要兩階段鎖機制
if (txn->GetState() == TransactionState::GROWING &&
!(lock_mode == LockMode::SHARED && txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED)) {
txn->SetState(TransactionState::SHRINKING);
}
// 從加鎖請求佇列中移除事務的請求
auto &queue = lock_table_[rid];
queue.request_queue_.erase(request_it);
if (lock_mode == LockMode::SHARED) {
// 喚醒等待讀鎖的執行緒
if (--queue.reader_count_ == 0) {
queue.cv_.notify_all();
}
} else {
// 喚醒等待寫鎖的執行緒
queue.writer_enter_ = false;
queue.cv_.notify_all();
}
return true;
}
兩階段鎖無法避免死鎖現象的出現,所以我們需要有一個後臺執行緒來定期做死鎖檢測。死鎖檢測的方式是根據事務的等待情況構建一個有向的等待圖,如果等待圖中有環,就中止環中最年輕的事務,直到沒有環出現。
LockManager
中的 unordered_map<txn_id_t, vector<txn_id_t>> waits_for_
代表等待圖,鍵為事務 T(頂點),值為事務 T 在等待解鎖的其他事務的集合,二者組成了等待圖中的邊。
void LockManager::AddEdge(txn_id_t t1, txn_id_t t2) {
txns_.insert(t1);
txns_.insert(t2);
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it == neighbors.end()) {
neighbors.push_back(t2);
}
}
void LockManager::RemoveEdge(txn_id_t t1, txn_id_t t2) {
auto &neighbors = waits_for_[t1];
auto it = std::find(neighbors.begin(), neighbors.end(), t2);
if (it != neighbors.end()) {
neighbors.erase(it);
}
}
std::vector<std::pair<txn_id_t, txn_id_t>> LockManager::GetEdgeList() {
std::vector<std::pair<txn_id_t, txn_id_t>> edges;
for (auto &[t1, neighbors] : waits_for_) {
for (auto t2 : neighbors) {
edges.emplace_back(t1, t2);
}
}
return edges;
}
要檢測環,需要使用深度優先演演算法:從某個頂點 \(v_s\) 出發,遍歷頂點的鄰接頂點,如果最終能回到 \(v_s\),就說明存在環。具體實現時是在集合 on_stack_txns_
中維護正在存取的頂點,如果遍歷臨接頂點的時候能在集合中找到 \(v_s\) 就說明有環。
bool LockManager::HasCycle(txn_id_t *txn_id) {
for (auto &t1 : txns_) {
DFS(t1);
if (has_cycle_) {
*txn_id = *on_stack_txns_.rbegin();
on_stack_txns_.clear();
has_cycle_ = false;
return true;
}
}
on_stack_txns_.clear();
return false;
}
void LockManager::DFS(txn_id_t txn_id) {
if (has_cycle_) {
return;
}
on_stack_txns_.insert(txn_id);
auto &neighbors = waits_for_[txn_id];
std::sort(neighbors.begin(), neighbors.end());
for (auto t2 : neighbors) {
if (!on_stack_txns_.count(t2)) {
DFS(t2);
} else {
has_cycle_ = true;
return;
}
}
on_stack_txns_.erase(txn_id);
}
定期檢測環時需要先根據鎖表構建等待圖,然後移除所有環並中止相關事務,最後清空等待圖:
void LockManager::RunCycleDetection() {
while (enable_cycle_detection_) {
std::this_thread::sleep_for(cycle_detection_interval);
{
std::unique_lock<std::mutex> l(latch_);
// 構建等待圖
for (auto &[rid, queue] : lock_table_) {
std::vector<txn_id_t> grants;
auto it = queue.request_queue_.begin();
while (it != queue.request_queue_.end() && it->granted_) {
grants.push_back(it->txn_id_);
it++;
}
while (it != queue.request_queue_.end()) {
for (auto &t2 : grants) {
AddEdge(it->txn_id_, t2);
}
wait_rids_[it->txn_id_] = rid;
it++;
}
}
// 移除環中 id 最小的事務
txn_id_t txn_id;
while (HasCycle(&txn_id)) {
AbortTransaction(txn_id);
}
// 清空圖
waits_for_.clear();
wait_rids_.clear();
txns_.clear();
}
}
}
void LockManager::AbortTransaction(txn_id_t txn_id) {
auto txn = TransactionManager::GetTransaction(txn_id);
txn->SetState(TransactionState::ABORTED);
waits_for_.erase(txn_id);
// 釋放所有 txn 持有的寫鎖
for (auto &rid : *txn->GetExclusiveLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 釋放所有 txn 持有的讀鎖
for (auto &rid : *txn->GetSharedLockSet()) {
for (auto &req : lock_table_[rid].request_queue_) {
if (!req.granted_) {
RemoveEdge(req.txn_id_, txn_id);
}
}
}
// 通知 txn 所線上程事務被終止了
lock_table_[wait_rids_[txn_id]].cv_.notify_all();
}
上一節中我們實現了單執行緒的執行器,現在需要對其進行修改,使其支援三種隔離級別的並行事務。
如果隔離級別高於讀未提交,就給事務加上共用鎖,讀已提交的事務需要在返回 tuple 之前釋放鎖:
void SeqScanExecutor::Unlock(Transaction *txn, const RID &rid) {
if (txn->GetIsolationLevel() == IsolationLevel::READ_COMMITTED) {
exec_ctx_->GetLockManager()->Unlock(txn, rid);
}
}
bool SeqScanExecutor::Next(Tuple *tuple, RID *rid) {
auto predicate = plan_->GetPredicate();
auto txn = exec_ctx_->GetTransaction();
while (it_ != table_metadata_->table_->End()) {
*rid = it_->GetRid();
// 上鎖
if (txn->GetIsolationLevel() != IsolationLevel::READ_UNCOMMITTED && !txn->IsExclusiveLocked(*rid)) {
exec_ctx_->GetLockManager()->LockShared(txn, *rid);
}
*tuple = *it_++;
if (!predicate || predicate->Evaluate(tuple, &table_metadata_->schema_).GetAs<bool>()) {
// 只保留輸出列
std::vector<Value> values;
for (auto &col : GetOutputSchema()->GetColumns()) {
values.push_back(col.GetExpr()->Evaluate(tuple, &table_metadata_->schema_));
}
*tuple = {values, GetOutputSchema()};
// 解鎖
Unlock(txn, *rid);
return true;
}
Unlock(txn, *rid);
}
return false;
}
插入的時候需要給 tuple 加寫鎖,但是不應該在 TableHeap::InsertTuple()
後面加,而是應該在函數裡面加,這樣插入之後才不會有其他事務改動了此 tuple。為了實現回滾操作,還需要新增一條 IndexWriteRecord
。由於 TableHeap::InsertTuple()
內部已經新增了 TableWriteRecord
,所以我們無需再次新增。
void InsertExecutor::InsertTuple(Tuple *tuple, RID *rid) {
// 更新資料表,需要在 TablePage::InsertTuple 中加鎖
table_metadata_->table_->InsertTuple(*tuple, rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
index_info->index_->InsertEntry(
tuple->KeyFromTuple(table_metadata_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_metadata_->oid_, WType::INSERT, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
}
更新之前需要將讀鎖升級為寫鎖,如果先前沒有拿鎖就需要直接請求寫鎖。
bool UpdateExecutor::Next([[maybe_unused]] Tuple *tuple, RID *rid) {
if (!child_executor_->Next(tuple, rid)) {
return false;
}
// 更新資料表
auto new_tuple = GenerateUpdatedTuple(*tuple);
// 加鎖
auto txn = exec_ctx_->GetTransaction();
if (txn->IsSharedLocked(*rid)) {
exec_ctx_->GetLockManager()->LockUpgrade(txn, *rid);
} else {
exec_ctx_->GetLockManager()->LockExclusive(txn, *rid);
}
table_info_->table_->UpdateTuple(new_tuple, *rid, exec_ctx_->GetTransaction());
// 更新索引
for (auto &index_info : index_infos_) {
// 刪除舊的 tuple
index_info->index_->DeleteEntry(
tuple->KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
// 插入新的 tuple
index_info->index_->InsertEntry(
new_tuple.KeyFromTuple(table_info_->schema_, index_info->key_schema_, index_info->index_->GetKeyAttrs()), *rid,
exec_ctx_->GetTransaction());
IndexWriteRecord record(*rid, table_info_->oid_, WType::UPDATE, *tuple, index_info->index_oid_,
exec_ctx_->GetCatalog());
exec_ctx_->GetTransaction()->AppendTableWriteRecord(record);
}
return true;
}
通過這次實驗,可以加深對隔離級別和兩階段鎖協定的理解,程式碼上層面多了對多執行緒同步技術的要求,不過做過去年實驗的話應該問題也不大,以上~