通過學習如何定位並行處理的陷阱來避免未來處理這些問題時的困境。
在複雜的分散式系統進行任務處理時,你通常會需要進行並行的操作。在 Mode.net 公司,我們每天都要和實時、快速和靈活的軟體打交道。而沒有一個高度並行的系統,就不可能構建一個毫秒級的動態地路由封包的全球專用網路。這個動態路由是基於網路狀態的,儘管這個過程需要考慮眾多因素,但我們的重點是鏈路指標。在我們的環境中,鏈路指標可以是任何跟網路連結的狀態和當前屬性(如連結延遲)有關的任何內容。
我們的動態路由演算法 H.A.L.O.(逐跳自適應鏈路狀態最佳路由)部分依賴於鏈路指標來計算路由表。這些指標由位於每個 PoP(存活節點)上的獨立元件收集。PoP 是表示我們的網路中單個路由實體的機器,通過鏈路連線並分布在我們的網路拓撲中的各個位置。某個元件使用網路封包探測周圍的機器,周圍的機器回復封包給前者。從接收到的探測包中可以獲得鏈路延遲。由於每個 PoP 都有不止一個臨近節點,所以這種探測任務實質上是並行的:我們需要實時測量每個臨近連線點的延遲。我們不能序列地處理;為了計算這個指標,必須盡快處理每個探測。
我們的探測元件互相傳送和接收封包,並依靠序列號進行封包處理。這旨在避免處理重複的包或順序被打亂的包。我們的第一個實現依靠特殊的序列號 0 來重置序列號。這個數位僅在元件初始化時使用。主要的問題是我們考慮了遞增的序列號總是從 0 開始。在該元件重新啟動後,包的順序可能會重新排列,某個包的序列號可能會輕易地被替換成重置之前使用過的值。這意味著,後繼的包都會被忽略掉,直到排到重置之前用到的序列值。
這裡的問題是該元件重新啟動前後的序列號是否一致。有幾種方法可以解決這個問題,經過討論,我們選擇了實現一個帶有清晰狀態定義的三步握手協定。這個握手過程在初始化時通過連結建立對談。這樣可以確保節點通過同一個對談進行通訊且使用了適當的序列號。
為了正確實現這個過程,我們必須定義一個有清晰狀態和過渡的有限狀態機。這樣我們就可以正確管理握手過程中的所有極端情況。
對談 ID 由握手的初始化程式生成。一個完整的交換順序如下:
SYN(ID)
封包。ID
並行送一個 SYN-ACK(ID)
。SYN-ACK(ID)
並行送一個 ACK(ID)
。它還傳送一個從序列號 0 開始的封包。ID
,如果 ID 匹配,則接受 ACK(ID)
。它還開始接受序列號為 0 的封包。基本上,每種狀態下你都需要處理最多三種型別的事件:連結事件、封包事件和超時事件。這些事件會並行地出現,因此你必須正確處理並行。
SYN
/SYN-ACK
/ACK
)或只是探測響應。這裡面臨的最主要的問題是如何處理並行的超時到期和其他事件。這裡很容易陷入死鎖和資源競爭的陷阱。
本專案使用的語言是 Golang。它確實提供了原生的同步機制,如自帶的通道和鎖,並且能夠使用輕量級執行緒來進行併行處理。
gopher 們聚眾狂歡
首先,你可以設計兩個分別表示我們的對談和超時處理程式的結構體。
type Session struct { State SessionState Id SessionId RemoteIp string }type TimeoutHandler struct { callback func(Session) session Session duration int timer *timer.Timer }
Session
標識連線對談,內有表示對談 ID、臨近的連線點的 IP 和當前對談狀態的欄位。
TimeoutHandler
包含回撥函數、對應的對談、持續時間和指向排程計時器的指標。
每一個臨近連線點的對談都包含一個儲存排程 TimeoutHandler
的全域性對映。
SessionTimeout map[Session]*TimeoutHandler
下面方法註冊和取消超時:
// schedules the timeout callback function. func (timeout* TimeoutHandler) Register() { timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time.Second, func() { timeout.callback(timeout.session) }) }func (timeout* TimeoutHandler) Cancel() { if timeout.timer == nil { return } timeout.timer.Stop() }
你可以使用類似下面的方法來建立和儲存超時:
func CreateTimeoutHandler(callback func(Session), session Session, duration int) *TimeoutHandler { if sessionTimeout[session] == nil { sessionTimeout[session] := new(TimeoutHandler) } timeout = sessionTimeout[session] timeout.session = session timeout.callback = callback timeout.duration = duration return timeout }
超時處理程式建立後,會在經過了設定的 duration
時間(秒)後執行回撥函數。然而,有些事件會使你重新排程一個超時處理程式(與 SYN
狀態時的處理一樣,每 3 秒一次)。
為此,你可以讓回撥函數重新排程一次超時:
func synCallback(session Session) { sendSynPacket(session) // reschedules the same callback. newTimeout := NewTimeoutHandler(synCallback, session, SYN_TIMEOUT_DURATION) newTimeout.Register() sessionTimeout[state] = newTimeout }
這次回撥在新的超時處理程式中重新排程自己,並更新全域性對映 sessionTimeout
。
你的解決方案已經有了。可以通過檢查計時器到期後超時回撥是否執行來進行一個簡單的測試。為此,註冊一個超時,休眠 duration
秒,然後檢查是否執行了回撥的處理。執行這個測試後,最好取消預定的超時時間(因為它會重新排程),這樣才不會在下次測試時產生副作用。
令人驚訝的是,這個簡單的測試發現了這個解決方案中的一個問題。使用 cancel
方法來取消超時並沒有正確處理。以下順序的事件會導致資料資源競爭:
SYN
後接收到一個 SYN-ACK
)timeout.Cancel()
,這個函數呼叫了 timer.Stop()
。(請注意,Golang 計時器的停止不會終止一個已過期的計時器。)兩個執行緒並行地更新超時對映。最終結果是你無法取消註冊的超時,然後你也會丟失對執行緒 2 重新排程的超時的參照。這導致處理程式在一段時間內持續執行和重新排程,出現非預期行為。
使用鎖也不能完全解決問題。如果你在處理所有事件和執行回撥之前加鎖,它仍然不能阻止一個過期的回撥執行:
func (timeout* TimeoutHandler) Register() { timeout.timer = time.AfterFunc(time.Duration(timeout.duration) * time._Second_, func() { stateLock.Lock() defer stateLock.Unlock() timeout.callback(timeout.session) }) }
現在的區別就是全域性對映的更新是同步的,但是這還是不能阻止在你呼叫 timeout.Cancel()
後回撥的執行 —— 這種情況出現在排程計時器過期了但是還沒有拿到鎖的時候。你還是會丟失一個已註冊的超時的參照。
你可以使用取消通道,而不必依賴不能阻止到期的計時器執行的 golang 函數 timer.Stop()
。
這是一個略有不同的方法。現在你可以不用再通過回撥進行遞回地重新排程;而是註冊一個死迴圈,這個迴圈接收到取消信號或超時事件時終止。
新的 Register()
產生一個新的 go 執行緒,這個執行緒在超時後執行你的回撥,並在前一個超時執行後排程新的超時。返回給呼叫方一個取消通道,用來控制迴圈的終止。
func (timeout *TimeoutHandler) Register() chan struct{} { cancelChan := make(chan struct{}) go func () { select { case _ = <- cancelChan: return case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second): func () { stateLock.Lock() defer stateLock.Unlock() timeout.callback(timeout.session) } () } } () return cancelChan }func (timeout* TimeoutHandler) Cancel() { if timeout.cancelChan == nil { return } timeout.cancelChan <- struct{}{} }
這個方法給你註冊的所有超時提供了取消通道。一個取消呼叫向通道傳送一個空結構體並觸發取消操作。然而,這並不能解決前面的問題;可能在你通過通道取消之前以及超時執行緒拿到鎖之前,超時時間就已經到了。
這裡的解決方案是,在拿到鎖之後,檢查一下超時範圍內的取消通道。
case _ = <- time.AfterFunc(time.Duration(timeout.duration) * time.Second): func () { stateLock.Lock() defer stateLock.Unlock() select { case _ = <- handler.cancelChan: return default: timeout.callback(timeout.session) } } () }
最終,這可以確保在拿到鎖之後執行回撥,不會觸發取消操作。
這個解決方案看起來有效;但是還是有個隱患:死鎖。
請閱讀上面的程式碼,試著自己找到它。考慮下描述的所有函數的並行呼叫。
這裡的問題在取消通道本身。我們建立的是無緩衝通道,即傳送的是阻塞呼叫。當你在一個超時處理程式中呼叫取消函數時,只有在該處理程式被取消後才能繼續處理。問題出現在,當你有多個呼叫請求到同一個取消通道時,這時一個取消請求只被處理一次。當多個事件同時取消同一個超時處理程式時,如連線斷開或控制包事件,很容易出現這種情況。這會導致死鎖,可能會使應用程式停機。
有人在聽嗎?
(已獲得 Trevor Forrey 授權。)
這裡的解決方案是建立通道時指定快取大小至少為 1,這樣向通道傳送資料就不會阻塞,也顯式地使傳送變成非阻塞的,避免了並行呼叫。這樣可以確保取消操作只傳送一次,並且不會阻塞後續的取消呼叫。
func (timeout* TimeoutHandler) Cancel() { if timeout.cancelChan == nil { return } select { case timeout.cancelChan <- struct{}{}: default: // can’t send on the channel, someone has already requested the cancellation. } }
在實踐中你學到了並行操作時出現的常見錯誤。由於其不確定性,即使進行大量的測試,也不容易發現這些問題。下面是我們在最初的實現中遇到的三個主要問題:
這似乎是個很明顯的問題,但如果並行更新發生在不同的位置,就很難發現。結果就是資料競爭,由於一個更新會覆蓋另一個,因此對同一資料的多次更新中會有某些更新丟失。在我們的案例中,我們是在同時更新同一個共用對映里的排程超時參照。(有趣的是,如果 Go 檢測到在同一個對映物件上的並行讀寫,會丟擲致命錯誤 — 你可以嘗試下執行 Go 的資料競爭檢測器)。這最終會導致丟失超時參照,且無法取消給定的超時。當有必要時,永遠不要忘記使用鎖。
不要忘記同步 gopher 們的工作
在不能僅依賴鎖的獨占性的情況下,就需要進行條件檢查。我們遇到的場景稍微有點不一樣,但是核心思想跟條件變數是一樣的。假設有個一個生產者和多個消費者使用一個共用佇列的經典場景,生產者可以將一個元素新增到佇列並喚醒所有消費者。這個喚醒呼叫意味著佇列中的資料是可存取的,並且由於佇列是共用的,消費者必須通過鎖來進行同步存取。每個消費者都可能拿到鎖;然而,你仍然需要檢查佇列中是否有元素。因為在你拿到鎖的瞬間並不知道佇列的狀態,所以還是需要進行條件檢查。
在我們的例子中,超時處理程式收到了計時器到期時發出的“喚醒”呼叫,但是它仍需要檢查是否已向其傳送了取消信號,然後才能繼續執行回撥。
如果你要喚醒多個 gopher,可能就需要進行條件檢查
當一個執行緒被卡住,無限期地等待一個喚醒信號,但是這個信號永遠不會到達時,就會發生這種情況。死鎖可以通過讓你的整個程式停機來徹底殺死你的應用。
在我們的案例中,這種情況的發生是由於多次傳送請求到一個非緩衝且阻塞的通道。這意味著向通道傳送資料只有在從這個通道接收完資料後才能返回。我們的超時執行緒迴圈迅速從取消通道接收信號;然而,在接收到第一個信號後,它將跳出迴圈,並且再也不會從這個通道讀取資料。其他的呼叫會一直被卡住。為避免這種情況,你需要仔細檢查程式碼,謹慎處理阻塞呼叫,並確保不會發生執行緒飢餓。我們例子中的解決方法是使取消呼叫成為非阻塞呼叫 — 我們不需要阻塞呼叫。