本文已收錄到 AndroidFamily,技術和職場問題,請關注公眾號 [彭旭銳] 提問。
大家好,我是小彭。
在上一篇文章裡,我們聊到了 Square 開源的 I/O 框架 Okio 的三個優勢:精簡且全面的 API、基於共用的緩衝區設計以及超時機制。前兩個優勢已經分析過了,今天我們來分析 Okio 的超時檢測機制。
本文原始碼基於 Okio v3.2.0。
思維導圖:
超時機制是一項通用的系統設計,能夠避免系統長時間阻塞在某些任務上。例如網路請求在超時時間內沒有響應,使用者端就會提前中斷請求,並提示使用者某些功能不可用。
先思考一個問題,相比於傳統 IO 的超時有什麼優勢呢?我認為主要體現在 2 個方面:
Java 原生 IO 操作是否支援超時,完全取決於底層的系統呼叫是否支援。例如,網路 Socket 支援通過 setSoTimeout
API 設定單次 IO 操作的超時時間,而檔案 IO 操作就不支援,使用原生檔案 IO 就無法實現超時。
而 Okio 是統一在應用層實現超時檢測,不管系統呼叫是否支援超時,都能提供統一的超時檢測機制。
Java 原生 IO 操作只能實現對單次 IO 操作的超時檢測,無法實現對包含多次 IO 操作的複合任務超時檢測。例如,OkHttp 支援設定單次 connect、read 或 write 操作的超時檢測,還支援對一次完整 Call 請求的超時檢測,有時候單個操作沒有超時,但串聯起來的完整 call 卻超時了。
而 Okio 超時機制和 IO 操作沒有強耦合,不僅支援對 IO 操作的超時檢測,還支援非 IO 操作的超時檢測,所以這種複合任務的超時檢測也是可以實現的。
Timeout 類是 Okio 超時機制的核心類,Okio 對 Source 輸入流和 Sink 輸出流都提供了超時機制,我們在構造 InputStreamSource 和 OutputStreamSink 這些流的實現類時,都需要攜帶 Timeout 物件:
Source.kt
interface Source : Closeable {
// 返回超時控制物件
fun timeout(): Timeout
...
}
Sink.kt
actual interface Sink : Closeable, Flushable {
// 返回超時控制物件
actual fun timeout(): Timeout
...
}
Timeout 類提供了兩種設定超時時間的方式(如果兩種方式同時存在的話,Timeout 會優先採用更早的截止時間):
最終觸發超時的截止時間是任務的 startTime + timeoutNanos
;
Timeout.kt
// hasDeadline 這個屬性顯得沒必要
private var hasDeadline = false // 是否設定了截止時間點
private var deadlineNanoTime = 0L // 截止時間點(單位納秒)
private var timeoutNanos = 0L // 處理單次任務的超時時間(單位納秒)
建立 Source 和 Sink 物件時,都需要攜帶 Timeout 物件:
JvmOkio.kt
// ----------------------------------------------------------------------------
// 輸入流
// ----------------------------------------------------------------------------
fun InputStream.source(): Source = InputStreamSource(this, Timeout() /*Timeout 物件*/)
// 檔案輸入流
fun File.source(): Source = InputStreamSource(inputStream(), Timeout.NONE)
// Socket 輸入流
fun Socket.source(): Source {
val timeout = SocketAsyncTimeout(this)
val source = InputStreamSource(getInputStream(), timeout /*攜帶 Timeout 物件*/)
// 包裝為非同步超時
return timeout.source(source)
}
// ----------------------------------------------------------------------------
// 輸出流
// ----------------------------------------------------------------------------
fun OutputStream.sink(): Sink = OutputStreamSink(this, Timeout() /*Timeout 物件*/)
// 檔案輸出流
fun File.sink(append: Boolean = false): Sink = FileOutputStream(this, append).sink()
// Socket 輸出流
fun Socket.sink(): Sink {
val timeout = SocketAsyncTimeout(this)
val sink = OutputStreamSink(getOutputStream(), timeout /*攜帶 Timeout 物件*/)
// 包裝為非同步超時
return timeout.sink(sink)
}
在 Timeout 類的基礎上,Okio 提供了 2 種超時機制:
Okio 框架
Timeout 同步超時依賴於 Timeout#throwIfReached() 方法。
同步超時在每次執行任務之前,都需要先呼叫 Timeout#throwIfReached()
檢查當前時間是否到達超時截止時間。如果超時則會直接丟擲超時異常,不會再執行任務。
JvmOkio.kt
private class InputStreamSource(
// 輸入流
private val input: InputStream,
// 超時控制
private val timeout: Timeout
) : Source {
override fun read(sink: Buffer, byteCount: Long): Long {
// 1、引數校驗
if (byteCount == 0L) return 0
require(byteCount >= 0) { "byteCount < 0: $byteCount" }
// 2、檢查超時時間
timeout.throwIfReached()
// 3、執行輸入任務(已簡化)
val bytesRead = input.read(...)
return bytesRead.toLong()
}
...
}
private class OutputStreamSink(
// 輸出流
private val out: OutputStream,
// 超時控制
private val timeout: Timeout
) : Sink {
override fun write(source: Buffer, byteCount: Long) {
// 1、引數校驗
checkOffsetAndCount(source.size, 0, byteCount)
// 2、檢查超時時間
timeout.throwIfReached()
// 3、執行輸入任務(已簡化)
out.write(...)
...
}
...
}
看一眼 Timeout#throwIfReached 的原始碼。 可以看到,同步超時只考慮 「deadlineNanoTime 截止時間」,如果只設定 「timeoutNanos 任務處理時間」 是無效的,我覺得這個設計容易讓開發者出錯。
Timeout.kt
@Throws(IOException::class)
open fun throwIfReached() {
if (Thread.interrupted()) {
// 傳遞中斷狀態
Thread.currentThread().interrupt() // Retain interrupted status.
throw InterruptedIOException("interrupted")
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
// 丟擲超時異常
throw InterruptedIOException("deadline reached")
}
}
有必要解釋所謂 「同步」 的意思:
同步超時就是指任務的 「執行」 和 「超時檢查」 是同步的。當任務超時時,Okio 同步超時不會直接中斷任務執行,而是需要檢主動查超時時間(Timeout#throwIfReached)來判斷是否發生超時,再決定是否中斷任務執行。
這其實與 Java 的中斷機制是非常相似的:
當 Java 執行緒的中斷標記位置位時,並不是真的會直接中斷執行緒執行,而是主動需要檢查中斷標記位(Thread.interrupted)來判斷是否發生中斷,再決定是否中斷執行緒任務。所以說 Java 的執行緒中斷機制是一種 「同步中斷」。
可以看出,同步超時存在 「滯後性」:
因為同步超時需要主動檢查,所以即使在任務執行過程中發生超時,也必須等到檢查時才會發現超時,無法及時觸發超時異常。因此,就需要非同步超時機制。
同步超時示意圖
非同步超時監控進入: 非同步超時在每次執行任務之前,都需要先呼叫 AsyncTimeout#enter()
方法將 AsyncTimeout 掛載到超時佇列中,並根據超時截止時間的先後順序排序,佇列頭部的節點就是會最先超時的任務;
非同步超時監控退出: 在每次任務執行結束之後,都需要再呼叫 AsyncTimeout#exit()
方法將 AsyncTimeout 從超時佇列中移除。
注意: enter() 方法和 eixt() 方法必須成對存在。
AsyncTimeout.kt
open class AsyncTimeout : Timeout() {
// 是否在等待佇列中
private var inQueue = false
// 後續指標
private var next: AsyncTimeout? = null
// 超時截止時間
private var timeoutAt = 0L
// 非同步超時監控進入
fun enter() {
check(!inQueue) { "Unbalanced enter/exit" }
val timeoutNanos = timeoutNanos()
val hasDeadline = hasDeadline()
if (timeoutNanos == 0L && !hasDeadline) {
return
}
inQueue = true
scheduleTimeout(this, timeoutNanos, hasDeadline)
}
// 非同步超時監控退出
// 返回值:是否發生超時(如果節點不存在,說明被 WatchDog 執行緒移除,即發生超時)
fun exit(): Boolean {
if (!inQueue) return false
inQueue = false
return cancelScheduledTimeout(this)
}
// 在 WatchDog 執行緒呼叫
protected open fun timedOut() {}
companion object {
// 超時佇列頭節點(哨兵節點)
private var head: AsyncTimeout? = null
// 分發超時監控任務
private fun scheduleTimeout(node: AsyncTimeout, timeoutNanos: Long, hasDeadline: Boolean) {
synchronized(AsyncTimeout::class.java) {
// 首次新增監控時,需要啟動 Watchdog 執行緒
if (head == null) {
// 哨兵節點
head = AsyncTimeout()
Watchdog().start()
}
// now:當前時間
val now = System.nanoTime()
// timeoutAt 超時截止時間:計算 now + timeoutNanos 和 deadlineNanoTime 的較小值
if (timeoutNanos != 0L && hasDeadline) {
node.timeoutAt = now + minOf(timeoutNanos, node.deadlineNanoTime() - now)
} else if (timeoutNanos != 0L) {
node.timeoutAt = now + timeoutNanos
} else if (hasDeadline) {
node.timeoutAt = node.deadlineNanoTime()
} else {
throw AssertionError()
}
// remainingNanos 超時剩餘時間:當前時間距離超時發生的時間
val remainingNanos = node.remainingNanos(now)
var prev = head!!
// 線性遍歷超時佇列,按照超時截止時間將 node 節點插入超時佇列
while (true) {
if (prev.next == null || remainingNanos < prev.next!!.remainingNanos(now)) {
node.next = prev.next
prev.next = node
// 如果插入到佇列頭部,需要喚醒 WatchDog 執行緒
if (prev === head) {
(AsyncTimeout::class.java as Object).notify()
}
break
}
prev = prev.next!!
}
}
}
// 取消超時監控任務
// 返回值:是否超時
private fun cancelScheduledTimeout(node: AsyncTimeout): Boolean {
synchronized(AsyncTimeout::class.java) {
// 線性遍歷超時佇列,將 node 節點移除
var prev = head
while (prev != null) {
if (prev.next === node) {
prev.next = node.next
node.next = null
return false
}
prev = prev.next
}
// 如果節點不存在,說明被 WatchDog 執行緒移除,即發生超時
return true
}
}
}
}
同時,在首次新增非同步超時監控時,AsyncTimeout 內部會開啟一個 WatchDog
守護執行緒,按照 「檢測 - 等待」 模型觀察超時佇列的頭節點:
如果發生超時,則將頭節點移除,並回撥 AsyncTimeout#timeOut()
方法。這是一個空方法,需要由子類實現來主動取消任務;
如果未發生超時,則 WatchDog 執行緒會計算距離超時發生的時間間隔,呼叫 Object#wait(時間間隔)
進入限時等待。
需要注意的是: AsyncTimeout#timeOut() 回撥中不能執行耗時操作,否則會影響後續檢測的及時性。
有意思的是:我們會發現 Okio 的超時檢測機制和 Android ANR 的超時檢測機制非常類似,所以我們可以說 ANR 也是一種非同步超時機制。
AsyncTimeout.kt
private class Watchdog internal constructor() : Thread("Okio Watchdog") {
init {
// 守護執行緒
isDaemon = true
}
override fun run() {
// 死迴圈
while (true) {
try {
var timedOut: AsyncTimeout? = null
synchronized(AsyncTimeout::class.java) {
// 取頭節點(Maybe wait)
timedOut = awaitTimeout()
// 超時佇列為空,退出執行緒
if (timedOut === head) {
head = null
return
}
}
// 超時發生,觸發 AsyncTimeout#timedOut 回撥
timedOut?.timedOut()
} catch (ignored: InterruptedException) {
}
}
}
}
companion object {
// 超時佇列為空時,再等待一輪的時間
private val IDLE_TIMEOUT_MILLIS = TimeUnit.SECONDS.toMillis(60)
private val IDLE_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(IDLE_TIMEOUT_MILLIS)
@Throws(InterruptedException::class)
internal fun awaitTimeout(): AsyncTimeout? {
// Get the next eligible node.
val node = head!!.next
// 如果超時佇列為空
if (node == null) {
// 需要再等待 60s 後再判斷(例如在首次新增監控時)
val startNanos = System.nanoTime()
(AsyncTimeout::class.java as Object).wait(IDLE_TIMEOUT_MILLIS)
return if (head!!.next == null && System.nanoTime() - startNanos >= IDLE_TIMEOUT_NANOS) {
// 退出 WatchDog 執行緒
head
} else {
// WatchDog 執行緒重新取一次
null
}
}
// 計算當前時間距離超時發生的時間
var waitNanos = node.remainingNanos(System.nanoTime())
// 未超時,進入限時等待
if (waitNanos > 0) {
// Waiting is made complicated by the fact that we work in nanoseconds,
// but the API wants (millis, nanos) in two arguments.
val waitMillis = waitNanos / 1000000L
waitNanos -= waitMillis * 1000000L
(AsyncTimeout::class.java as Object).wait(waitMillis, waitNanos.toInt())
return null
}
// 超時,將頭節點移除
head!!.next = node.next
node.next = null
return node
}
}
非同步超時示意圖
直接看程式碼不好理解,我們來舉個例子:
在 OkHttp 中,支援設定一次完整的 Call 請求上的操作時間 callTimeout。一次 Call 請求包含多個 IO 操作的複合任務,使用傳統 IO 是不可能監控超時的,所以需要使用 AsyncTimeout 非同步超時。
在 OkHttp 的 RealCall 請求類中,就使用了 AsyncTimeout 非同步超時:
1、開始任務: 在 execute() 方法中,呼叫 AsyncTimeout#enter()
進入非同步超時監控,再執行請求;
2、結束任務: 在 callDone() 方法中,呼叫 AsyncTimeout#exit()
退出非同步超時監控。分析原始碼發現:callDone() 不僅在請求正常時會呼叫,在取消請求時也會回撥,保證了 enter() 和 exit() 成對存在;
3、超時回撥: 在 AsyncTimeout#timeOut
超時回撥中,呼叫了 Call#cancel() 提前取消請求。Call#cancel() 會呼叫到 Socket#close(),讓阻塞中的 IO 操作丟擲 SocketException 異常,以達到提前中斷的目的,最終也會走到 callDone() 執行 exit() 退出非同步監控。
Call 超時監控示意圖
RealCall
class RealCall(
val client: OkHttpClient,
/** The application's original request unadulterated by redirects or auth headers. */
val originalRequest: Request,
val forWebSocket: Boolean
) : Call {
// 3、AsyncTimeout 超時監控
private val timeout = object : AsyncTimeout() {
override fun timedOut() {
// 取消請求
cancel()
}
}.apply {
timeout(client.callTimeoutMillis.toLong(), MILLISECONDS)
}
// 取消請求
override fun cancel() {
if (canceled) return // Already canceled.
canceled = true
exchange?.cancel()
// 最終會呼叫 Socket#close()
connectionToCancel?.cancel()
eventListener.canceled(this)
}
// 1、請求開始(由業務層呼叫)
override fun execute(): Response {
// 1.1 非同步超時監控進入
timeout.enter()
// 1.2 執行請求
client.dispatcher.executed(this)
return getResponseWithInterceptorChain()
}
// 2、請求結束(由 OkHttp 引擎層呼叫,包含正常和異常情況)
// 除了 IO 操作在丟擲異常後會走到 callDone(),在取消請求時也會走到 callDone()
internal fun <E : IOException?> messageDone(
exchange: Exchange,
requestDone: Boolean, // 請求正常結束
responseDone: Boolean, // 響應正常結束
e: E
): E {
...
if (callDone) {
return callDone(e)
}
return e
}
private fun <E : IOException?> callDone(e: E): E {
...
// 檢查是否超時
val result = timeoutExit(e)
if (e != null) {
// 請求異常(包含超時異常)
eventListener.callFailed(this, result!!)
} else {
// 請求正常結束
eventListener.callEnd(this)
}
return result
}
private fun <E : IOException?> timeoutExit(cause: E): E {
if (timeoutEarlyExit) return cause
// 2.1 非同步超時監控退出
if (!timeout.exit()) return cause
// 2.2 包裝超時異常
val e = InterruptedIOException("timeout")
if (cause != null) e.initCause(cause)
return e as E
}
}
呼叫 Socket#close() 會讓阻塞中的 IO 操作丟擲 SocketException 異常:
Socket.java
// Any thread currently blocked in an I/O operation upon this socket will throw a {@link SocketException}.
public synchronized void close() throws IOException {
synchronized(closeLock) {
if (isClosed())
return;
if (created)
impl.close();
closed = true;
}
}
Exchange 中會捕獲 Socket#close() 丟擲的 SocketException 異常:
Exchange.kt
private inner class RequestBodySink(
delegate: Sink,
/** The exact number of bytes to be written, or -1L if that is unknown. */
private val contentLength: Long
) : ForwardingSink(delegate) {
@Throws(IOException::class)
override fun write(source: Buffer, byteCount: Long) {
...
try {
super.write(source, byteCount)
this.bytesReceived += byteCount
} catch (e: IOException) {
// Socket#close() 會丟擲異常,被這裡攔截
throw complete(e)
}
}
private fun <E : IOException?> complete(e: E): E {
if (completed) return e
completed = true
return bodyComplete(bytesReceived, responseDone = false, requestDone = true, e = e)
}
}
fun <E : IOException?> bodyComplete(
bytesRead: Long,
responseDone: Boolean,
requestDone: Boolean,
e: E
): E {
...
// 回撥到上面的 RealCall#messageDone
return call.messageDone(this, requestDone, responseDone, e)
}
先說一下 Okhttp 定義的 2 種顆粒度的超時:
其實 Socket 支援通過 setSoTimeout
API 設定單次操作的超時時間,但這個 API 無法滿足需求,比如說 Call 超時是包含多個 IO 操作的複合任務,而且不管是 HTTP/1 並行請求還是 HTTP/2 多路複用,都會存在一個 Socket 連線上同時承載多個請求的情況,無法區分是哪個請求超時。
因此,OkHttp 採用了兩種超時監測: