作者:崔曉兵
最近線上上遇到了一些[HMDConfigManager remoteConfigWithAppID:]
卡死
觀察了下主執行緒堆疊,用到的鎖是讀寫鎖
隨後又去翻了下持有著鎖的子執行緒,有各種各樣的情況,且基本都處於正常的執行狀態,例如有的處於開啟檔案狀態,有的處於read
狀態,有的正在執行NSUserDefaults
的方法…通過觀察發現,出問題的執行緒都有QOS:BACKGROUND
標記。整體看起來持有鎖的子執行緒仍然在執行,只是留給主執行緒的時間不夠了。為什麼這些子執行緒在持有鎖的情況下,需要執行這麼久,直到主執行緒的8s卡死?一種情況就是真的如此耗時,另一種則是出現了優先順序反轉。
在這個案例裡面,持有讀寫鎖且優先順序低的執行緒遲遲得不到排程(又或者得到排程的時候又被搶佔了,或者得到排程的時候時間已然不夠了),而具有高優先順序的執行緒由於拿不到讀寫鎖,一直被阻塞,所以互相死鎖。iOS8
之後引入了QualityOfService
的概念,類似於執行緒的優先順序,設定不同的QualityOfService
的值後系統會分配不同的CPU
時間、網路資源和硬碟資源等,因此我們可以通過這個設定佇列的優先順序 。
NSOperationQueue
的優先順序設定在 Threading Programming Guide 檔案中,蘋果給出了提示:
Important: It is generally a good idea to leave the priorities of your threads at their default values. Increasing the priorities of some threads also increases the likelihood of starvation among lower-priority threads. If your application contains high-priority and low-priority threads that must interact with each other, the starvation of lower-priority threads may block other threads and create performance bottlenecks.
蘋果的建議是不要隨意修改執行緒的優先順序,尤其是這些高低優先順序執行緒之間存在臨界資源競爭的情況。所以刪除相關優先順序設定程式碼即可解決問題。
在 pthread_rwlock_rdlock(3pthread) 發現瞭如下提示:
Realtime applications may encounter priority inversion when using read-write locks. The problem occurs when a high priority thread 「locks」 a read-write lock that is about to be 「unlocked」 by a low priority thread, but the low priority thread is preempted by a medium priority thread. This scenario leads to priority inversion; a high priority thread is blocked by lower priority threads for an unlimited period of time. During system design, realtime programmers must take into account the possibility of this kind of priority inversion. They can deal with it in a number of ways, such as by having critical sections that are guarded by read-write locks execute at a high priority, so that a thread cannot be preempted while executing in its critical section.
儘管針對的是實時系統,但是還是有一些啟示和幫助。按照提示,對有問題的程式碼進行了修改:線上程通過pthread_rwlock_wrlock
拿到_rwlock
的時候,臨時提升其優先順序,在釋放_rwlock
之後,恢復其原先的優先順序。
- (id)remoteConfigWithAppID:(NSString *)appID
{
.......
pthread_rwlock_rdlock(&_rwlock);
HMDHeimdallrConfig *result = ....... // get existing config
pthread_rwlock_unlock(&_rwlock);
if(result == nil) {
result = [[HMDHeimdallrConfig alloc] init]; // make a new config
pthread_rwlock_wrlock(&_rwlock);
qos_class_t oldQos = qos_class_self();
BOOL needRecover = NO;
// 臨時提升執行緒優先順序
if (_enablePriorityInversionProtection && oldQos < QOS_CLASS_USER_INTERACTIVE) {
int ret = pthread_set_qos_class_self_np(QOS_CLASS_USER_INTERACTIVE, 0);
needRecover = (ret == 0);
}
......
pthread_rwlock_unlock(&_rwlock);
// 恢復執行緒優先順序
if (_enablePriorityInversionProtection && needRecover) {
pthread_set_qos_class_self_np(oldQos, 0);
}
}
return result;
}
值得注意的是,這裡只能使用
pthread
的api
,NSThread
提供的API
是不可行的
為了驗證上述的手動調整執行緒優先順序是否有一定的效果,這裡通過demo
進行本地實驗:定義了2000
個operation
(目的是為了CPU
繁忙),優先順序設定NSQualityOfServiceUserInitiated
,且對其中可以被100
整除的operation
的優先順序調整為NSQualityOfServiceBackground
,在每個operation
執行相同的耗時任務,然後對這被選中的10
個operation
進行耗時統計。
for (int j = 0; j < 2000; ++j) {
NSOperationQueue *operation = [[NSOperationQueue alloc] init];
operation.maxConcurrentOperationCount = 1;
operation.qualityOfService = NSQualityOfServiceUserInitiated;
// 模組1
// if (j % 100 == 0) {
// operation.qualityOfService = NSQualityOfServiceBackground;
// }
// 模組1
[operation addOperationWithBlock:^{
// 模組2
// qos_class_t oldQos = qos_class_self();
// pthread_set_qos_class_self_np(QOS_CLASS_USER_INITIATED, 0);
// 模組2
NSTimeInterval start = CFAbsoluteTimeGetCurrent();
double sum = 0;
for (int i = 0; i < 100000; ++i) {
sum += sin(i) + cos(i) + sin(i*2) + cos(i*2);
}
start = CFAbsoluteTimeGetCurrent() - start;
if (j % 100 == 0) {
printf("%.8f\n", start * 1000);
}
// 模組2
// pthread_set_qos_class_self_np(oldQos, 0);
// 模組2
}];
}
統計資訊如下圖所示
A | B | C |
---|---|---|
(註釋模組1和模組2程式碼) | (只開啟模組1程式碼) | (同時開啟模組1和模組2程式碼) |
11.8190561 | 94.70210189 | 15.04005137 |
可以看到
operation
被設定為低優先順序時,其耗時大幅度提升為:94.70210189;operation
被設定為低優先順序時,又在Block
中手動恢復其原有的優先順序,其耗時已經大幅度降低:15.04005137( 耗時比正常情況高,大家可以思考下為什麼)通過Demo
可以發現,通過手動調整其優先順序,低優先順序任務的整體耗時得到大幅度的降低,這樣在持有鎖的情況下,可以減少對主執行緒的阻塞時間。
該問題的驗證過程分為2
個階段:
3
月6
號開始在版本19.7
上有較大幅度的下降,主要原因:堆疊中被等待的佇列資訊由QOS:BACKGROUND
變為了com.apple.root.default-qos
,佇列的優先順序從QOS_CLASS_BACKGROUND
提升為QOS_CLASS_DEFAULT
,相當於實施了方案一,使用了預設優先順序。2
個紅框所示,從4
月24
號在版本20.3
上開始驗證。目前看起來效果暫時不明顯,推測一個主要原因是:demo
中是把優先順序從QOS_CLASS_BACKGROUND
提升為QOS_CLASS_USER_INITIATED
,而線上相當於把佇列的優先順序從預設的優先順序QOS_CLASS_DEFAULT
提升為QOS_CLASS_USER_INITIATED
所以相對來說,線上的提升相對有限。
QOS_CLASS_BACKGROUND
的Mach
層級優先順序數是4;QOS_CLASS_DEFAULT
的Mach
層級優先順序數是31;QOS_CLASS_USER_INITIATED
的Mach
層級優先順序數是37;那麼是否所有鎖都需要像上文一樣,手動提升持有鎖的執行緒優先順序?系統是否會自動調整執行緒的優先順序?如果有這樣的機制,是否可以覆蓋所有的鎖?要理解這些問題,需要深刻認識優先順序反轉。
優先順序反轉,是指某同步資源被較低優先順序的程序/執行緒所擁有,較高優先順序的程序/執行緒競爭該同步資源未獲得該資源,而使得較高優先順序程序/執行緒反而推遲被排程執行的現象。根據阻塞型別的不同,優先順序反轉又被分為Bounded priority inversion
和Unbounded priority inversion
。這裡藉助 Introduction to RTOS - Solution to Part 11 (Priority Inversion) 的圖進行示意。
如圖所示,高優先順序任務(Task H
)被持有鎖的低優先順序任務(Task L
)阻塞,由於阻塞的時間取決於低優先順序任務在臨界區的時間(持有鎖的時間),所以被稱為bounded priority inversion
。只要Task L
一直持有鎖,Task H
就會一直被阻塞,低優先順序的任務執行在高優先順序任務的前面,優先順序被反轉。
這裡的任務也可以理解為執行緒
在Task L
持有鎖的情況下,如果有一箇中間優先順序的任務(Task M
)打斷了Task L
,前面的bounded
就會變為unbounded
,因為Task M
只要搶佔了Task L
的CPU
,就可能會阻塞Task H
任意多的時間(Task M
可能不止1
個)
目前解決Unbounded priority inversion
有2
種方法:一種被稱作優先權極限(priority ceiling protocol
),另一種被稱作優先順序繼承(priority inheritance
)。
在優先權極限方案中,系統把每一個臨界資源與1個極限優先權相關聯。當1個任務進入臨界區時,系統便把這個極限優先權傳遞給這個任務,使得這個任務的優先權最高;當這個任務退出臨界區後,系統立即把它的優先權恢復正常,從而保證系統不會出現優先權反轉的情況。該極限優先權的值是由所有需要該臨界資源的任務的最大優先順序來決定的。
如圖所示,鎖的極限優先權是3。當Task L
持有鎖的時候,它的優先順序將會被提升到3,和Task H
一樣的優先順序。這樣就可以阻止Task M
(優先順序是2)的執行,直到Task L
和Task H
不再需要該鎖。
在優先順序繼承方案中,大致原理是:高優先順序任務在嘗試獲取鎖的時候,如果該鎖正好被低優先順序任務持有,此時會臨時把高優先順序執行緒的優先順序轉移給擁有鎖的低優先順序執行緒,使低優先順序執行緒能更快的執行並釋放同步資源,釋放同步資源後再恢復其原來的優先順序。
priority ceiling protocol
和priority inheritance
都會在釋放鎖的時候,恢復低優先順序任務的優先順序。同時要注意,以上2
種方法只能阻止Unbounded priority inversion
,而無法阻止Bounded priority inversion
(Task H
必須等待Task L
執行完畢才能執行,這個反轉是無法避免的)。
可以通過以下幾種發生來避免或者轉移Bounded priority inversion
:
Bounded priority inversion
的反轉耗時;優先順序繼承必須是可傳遞的。舉個栗子:當
T1
阻塞在被T2
持有的資源上,而T2
又阻塞在T3
持有的一個資源上。如果T1
的優先順序高於T2
和T3
的優先順序,T3
必須通過T2
繼承T1
的優先順序。否則,如果另外一個優先順序高於T2
和T3
,小於T1
的執行緒T4
,將搶佔T3
,引發相對於T1
的優先順序反轉。因此,執行緒所繼承的優先順序必須是直接或者間接阻塞的執行緒的最高優先順序。
iOS 系統主要使用以下兩種機制來在不同執行緒(或 queue
)間傳遞 QoS
:
dispatch_async
dispatch_async()
automatically propagates the QoS from the calling thread, though it will translate User Interactive to User Initiated to avoid assigning that priority to non-main threads.IPC
)系統的 QoS 傳遞規則比較複雜,主要參考以下資訊:
QoS
dispatch_block_create
() 方法生成的 dispatch_block
,則考慮生成 block
時所呼叫的引數dispatch_async
或 IPC
的目標 queue
或執行緒的 QoS
排程程式會根據這些資訊決定 block
以什麼優先順序執行。
block
,則 block
就按上面所說的優先順序來執行。如果出現了執行緒間同步等待的情況,則排程程式會根據情況調整執行緒的執行優先順序。
如果當前執行緒因等待某執行緒(執行緒1)上正在進行的操作(如 block1
)而受阻,而系統知道 block1
所在的目標執行緒(owner
),系統會通過提高相關執行緒的優先順序來解決優先順序反轉的問題。反之如果系統不知道 block1
所在目標執行緒,則無法知道應該提高誰的優先順序,也就無法解決反轉問題;
記錄了持有者資訊(owner
)的系統 API 如下:
pthread mutex
、os_unfair_lock
、以及基於這二者實現的上層 API
dispatch_once
的實現是基於 os_unfair_lock
的NSLock
、NSRecursiveLock
、@synchronized
等的實現是基於 pthread mutex
dispatch_sync
、dispatch_wait
xpc_connection_send_with_message_sync
使用以上這些 API
能夠在發生優先順序反轉時使系統啟用優先順序反轉避免機制。
接下來對前文提到的各種「基礎系統API
」進行驗證
測試驗證環境:模擬器 iOS15.2
pthread mutex
的資料結構pthread_mutex_s
其中有一個m_tid
欄位,專門來記錄持有該鎖的執行緒Id
。
// types_internal.h
struct pthread_mutex_s {
long sig;
_pthread_lock lock;
union {
uint32_t value;
struct pthread_mutex_options_s options;
} mtxopts;
int16_t prioceiling;
int16_t priority;
#if defined(__LP64__)
uint32_t _pad;
#endif
union {
struct {
uint32_t m_tid[2]; // thread id of thread that has mutex locked
uint32_t m_seq[2]; // mutex sequence id
uint32_t m_mis[2]; // for misaligned locks m_tid/m_seq will span into here
} psynch;
struct _pthread_mutex_ulock_s ulock;
};
#if defined(__LP64__)
uint32_t _reserved[4];
#else
uint32_t _reserved[1];
#endif
};
程式碼來驗證一下:執行緒優先順序是否會被提升?
// printThreadPriority用來列印執行緒的優先順序資訊
void printThreadPriority() {
thread_t cur_thread = mach_thread_self();
mach_port_deallocate(mach_task_self(), cur_thread);
mach_msg_type_number_t thread_info_count = THREAD_INFO_MAX;
thread_info_data_t thinfo;
kern_return_t kr = thread_info(cur_thread, THREAD_EXTENDED_INFO, (thread_info_t)thinfo, &thread_info_count);
if (kr != KERN_SUCCESS) {
return;
}
thread_extended_info_t extend_info = (thread_extended_info_t)thinfo;
printf("pth_priority: %d, pth_curpri: %d, pth_maxpriority: %d\n", extend_info->pth_priority, extend_info->pth_curpri, extend_info->pth_maxpriority);
}
先在子執行緒上鎖並休眠,然後主執行緒請求該鎖
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
pthread_mutex_lock(&_lock); //確保 backgroundQueue 先得到鎖
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
pthread_mutex_lock(&_lock);
printf("in main lock\n");
pthread_mutex_unlock(&_lock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
pthread_mutex_unlock(&_lock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock
可以看到,低優先順序子執行緒先持有鎖,當時的優先順序為4
,而該鎖被主執行緒請求的時候,子執行緒的優先順序被提升為47
os_unfair_lock
用來替換OSSpinLock
,解決優先順序反轉問題。等待os_unfair_lock
鎖的執行緒會處於休眠狀態,從使用者態切換到核心態,而並非忙等。os_unfair_lock
將執行緒ID
儲存到了鎖的內部,鎖的等待者會把自己的優先順序讓出來,從而避免優先順序反轉。驗證一下:
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
os_unfair_lock_lock(&_unfair_lock); //確保 backgroundQueue 先得到鎖
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
os_unfair_lock_lock(&_unfair_lock);
printf("in main lock\n");
os_unfair_lock_unlock(&_unfair_lock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
os_unfair_lock_unlock(&_unfair_lock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
queue unlock
in main lock
after main unlock
queue after unlock
結果和pthread mutex
一致
在 pthread_rwlock_init 有如下提示:
Caveats: Beware of priority inversion when using read-write locks. A high-priority thread may be blocked waiting on a read-write lock locked by a low-priority thread. The microkernel has no knowledge of read-write locks, and therefore can’t boost the low-priority thread to prevent the priority inversion.
大意是核心不感知讀寫鎖,無法提升低優先順序執行緒的優先順序,從而無法避免優先順序反轉。通過查詢定義發現:pthread_rwlock_s
包含了欄位rw_tid
,專門來記錄持有寫鎖的執行緒,這不由令人好奇:為什麼pthread_rwlock_s
有owner
資訊卻仍然無法避免優先順序反轉?
struct pthread_rwlock_s {
long sig;
_pthread_lock lock;
uint32_t
unused:29,
misalign:1,
pshared:2;
uint32_t rw_flags;
#if defined(__LP64__)
uint32_t _pad;
#endif
uint32_t rw_tid[2]; // thread id of thread that has exclusive (write) lock
uint32_t rw_seq[4]; // rw sequence id (at 128-bit aligned boundary)
uint32_t rw_mis[4]; // for misaligned locks rw_seq will span into here
#if defined(__LP64__)
uint32_t _reserved[34];
#else
uint32_t _reserved[18];
#endif
};
https://news.ycombinator.com/item?id=21751269 連結中提到:
xnu supports priority inheritance through 「turnstiles」, a kernel-internal mechani** which is used by default by a number of locking primitives (list at [1]), including normal pthread mutexes (though not read-write locks [2]), as well as the os_unfair_lock API (via the ulock syscalls). With pthread mutexes, you can actually explicitly request priority inheritance by calling pthread_mutexattr_setprotocol [3] with PTHREAD_PRIO_INHERIT; the Apple implementation supports it, but currently ignores the protocol setting and just gives all mutexes priority inheritance.
大意是:XNU
使用turnstiles
核心機制進行優先順序繼承,這種機制被應用在pthread mutex
和os_unfair_lock
上。
順藤摸瓜,在ksyn_wait
方法中找到了_kwq_use_turnstile
的呼叫,其中的註釋對讀寫鎖解釋的比較委婉,新增了at least sometimes
pthread mutexes and rwlocks both (at least sometimes) know their owner and can use turnstiles. Otherwise, we pass NULL as the tstore to the shims so they wait on the global waitq.
// libpthread/kern/kern_synch.c
int
ksyn_wait(ksyn_wait_queue_t kwq, kwq_queue_type_t kqi, uint32_t lockseq,
int fit, uint64_t abstime, uint16_t kwe_flags,
thread_continue_t continuation, block_hint_t block_hint)
{
thread_t th = current_thread();
uthread_t uth = pthread_kern->get_bsdthread_info(th);
struct turnstile **tstore = NULL;
int res;
assert(continuation != THREAD_CONTINUE_NULL);
ksyn_waitq_element_t kwe = pthread_kern->uthread_get_uukwe(uth);
bzero(kwe, sizeof(*kwe));
kwe->kwe_count = 1;
kwe->kwe_lockseq = lockseq & PTHRW_COUNT_MASK;
kwe->kwe_state = KWE_THREAD_INWAIT;
kwe->kwe_uth = uth;
kwe->kwe_thread = th;
kwe->kwe_flags = kwe_flags;
res = ksyn_queue_insert(kwq, kqi, kwe, lockseq, fit);
if (res != 0) {
//panic("psynch_rw_wrlock: failed to enqueue\n"); // XXX ksyn_wqunlock(kwq);
return res;
}
PTHREAD_TRACE(psynch_mutex_kwqwait, kwq->kw_addr, kwq->kw_inqueue,
kwq->kw_prepost.count, kwq->kw_intr.count);
if (_kwq_use_turnstile(kwq)) {
// pthread mutexes and rwlocks both (at least sometimes) know their
// owner and can use turnstiles. Otherwise, we pass NULL as the
// tstore to the shims so they wait on the global waitq.
tstore = &kwq->kw_turnstile;
}
......
}
再去檢視_kwq_use_turnstile
的定義,程式碼還是很誠實的,只有在KSYN_WQTYPE_MTX
才會啟用turnstile
進行優先順序反轉保護,而讀寫鎖的型別為KSYN_WQTYPE_RWLOCK
,這說明讀寫鎖不會使用_kwq_use_turnstile
,所以無法避免優先順序反轉。
#define KSYN_WQTYPE_MTX 0x01
#define KSYN_WQTYPE_CVAR 0x02
#define KSYN_WQTYPE_RWLOCK 0x04
#define KSYN_WQTYPE_SEMA 0x08
static inline bool
_kwq_use_turnstile(ksyn_wait_queue_t kwq)
{
// If we had writer-owner information from the
// rwlock then we could use the turnstile to push on it. For now, only
// plain mutexes use it.
return (_kwq_type(kwq) == KSYN_WQTYPE_MTX);
}
另外在_pthread_find_owner
也可以看到,讀寫鎖的owner
是0
void
_pthread_find_owner(thread_t thread,
struct stackshot_thread_waitinfo * waitinfo)
{
ksyn_wait_queue_t kwq = _pthread_get_thread_kwq(thread);
switch (waitinfo->wait_type) {
case kThreadWaitPThreadMutex:
assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_MTX);
waitinfo->owner = thread_tid(kwq->kw_owner);
waitinfo->context = kwq->kw_addr;
break;
/* Owner of rwlock not stored in kernel space due to races. Punt
* and hope that the userspace address is helpful enough. */
case kThreadWaitPThreadRWLockRead:
case kThreadWaitPThreadRWLockWrite:
assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_RWLOCK);
waitinfo->owner = 0;
waitinfo->context = kwq->kw_addr;
break;
/* Condvars don't have owners, so just give the userspace address. */
case kThreadWaitPThreadCondVar:
assert((kwq->kw_type & KSYN_WQTYPE_MASK) == KSYN_WQTYPE_CVAR);
waitinfo->owner = 0;
waitinfo->context = kwq->kw_addr;
break;
case kThreadWaitNone:
default:
waitinfo->owner = 0;
waitinfo->context = 0;
break;
}
}
把鎖更換為讀寫鎖,驗證一下前面的理論是否正確:
pthread_rwlock_init(&_rwlock, NULL);
dispatch_async(dispatch_get_global_queue(DISPATCH_QUEUE_PRIORITY_BACKGROUND, 0), ^{
printf("begin : \n");
printThreadPriority();
printf("queue before lock \n");
pthread_rwlock_rdlock(&_rwlock); //確保 backgroundQueue 先得到鎖
printf("queue lock \n");
printThreadPriority();
dispatch_async(dispatch_get_main_queue(), ^{
printf("before main lock\n");
pthread_rwlock_wrlock(&_rwlock);
printf("in main lock\n");
pthread_rwlock_unlock(&_rwlock);
printf("after main unlock\n");
});
sleep(10);
printThreadPriority();
printf("queue unlock\n");
pthread_rwlock_unlock(&_rwlock);
printf("queue after unlock\n");
});
begin :
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue before lock
queue lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
before main lock
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
queue unlock
queue after unlock
in main lock
after main unlock
可以看到讀寫鎖不會發生優先順序提升
這個API
都比較熟悉了,這裡直接驗證:
// 當前執行緒為主執行緒
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printThreadPriority();
dispatch_async(_queue, ^{
printf("dispatch_async before dispatch_sync : \n");
printThreadPriority();
});
dispatch_sync(_queue, ^{
printf("dispatch_sync: \n");
printThreadPriority();
});
dispatch_async(_queue, ^{
printf("dispatch_async after dispatch_sync: \n");
printThreadPriority();
});
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_async before dispatch_sync :
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_sync:
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
dispatch_async after dispatch_sync:
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
_queue
是一個低優先順序佇列(QOS_CLASS_BACKGROUND
),可以看到dispatch_sync
呼叫壓入佇列的任務,以及在這之前dispatch_async
壓入的任務,都被提升到較高的優先順序47
(和主執行緒一致),而最後一個dispatch_async
的任務則以優先順序4
來執行。
// 當前執行緒為主執行緒
dispatch_queue_attr_t qosAttribute = dispatch_queue_attr_make_with_qos_class(DISPATCH_QUEUE_SERIAL, QOS_CLASS_BACKGROUND, 0);
_queue = dispatch_queue_create("com.demo.test", qosAttribute);
printf("main thread\n");
printThreadPriority();
dispatch_block_t block = dispatch_block_create(DISPATCH_BLOCK_INHERIT_QOS_CLASS, ^{
printf("sub thread\n");
sleep(2);
printThreadPriority();
});
dispatch_async(_queue, block);
dispatch_wait(block, DISPATCH_TIME_FOREVER);
_queue
是一個低優先順序佇列(QOS_CLASS_BACKGROUND
),當在當前主執行緒使用dispatch_wait
進行等待時,輸出如下,低優先順序的任務被提升到優先順序47
main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
而如果將dispatch_wait(block, DISPATCH_TIME_FOREVER)
註釋掉之後,輸出如下:
main thread
pth_priority: 47, pth_curpri: 47, pth_maxpriority: 63
sub thread
pth_priority: 4, pth_curpri: 4, pth_maxpriority: 63
值得注意的是,
dispatch_wait
是一個宏(C11
的泛型),或者是一個入口函數,它可以接受dispatch_block_t
,dispatch_group_t
,dispatch_semaphore_t
3
種型別的引數,但是這裡的具體含義應該是指dispatch_block_wait
,只有dispatch_block_wait
會調整優先順序,避免優先順序反轉。
intptr_t
dispatch_wait(void *object, dispatch_time_t timeout);
#if __has_extension(c_generic_selections)
#define dispatch_wait(object, timeout) \
_Generic((object), \
dispatch_block_t:dispatch_block_wait, \
dispatch_group_t:dispatch_group_wait, \
dispatch_semaphore_t:dispatch_semaphore_wait \
)((object),(timeout))
#endif
dispatch_semaphore
之前對dispatch_semaphore
的認知非常淺薄,經常把二值號誌和互斥鎖劃等號。但是通過調研後發現:dispatch_semaphore
沒有 QoS
的概念,沒有記錄當前持有號誌的執行緒(owner
),所以有高優先順序的執行緒在等待鎖時,核心無法知道該提高哪個執行緒的偵錯優先順序(QoS
)。如果鎖持有者優先順序比其他執行緒低,高優先順序的等待執行緒將一直等待。Mutex vs Semaphore: What’s the Difference? 一文詳細比對了Mutex
和Semaphore
之間的區別。
Semaphores are for signaling (sames a condition variables, events) while mutexes are for mutual exclusion. Technically, you can also use semaphores for mutual exclusion (a mutex can be thought as a binary semaphore) but you really shouldn’t.Right, but libdispatch doesn’t have a mutex. It has semaphores and queues. So if you’re trying to use libdispatch and you don’t want the closure-based aspect of queues, you might be tempted to use a semaphore instead. Don’t do that, use os_unfair_lock or pthread_mutex (or a higher-level construct like NSLock) instead.
這些是一些警示,可以看到dispatch_semaphore
十分危險,使用需要特別小心。
這裡通過蘋果官方提供的demo進行解釋:
__block NSString *taskName = nil;
dispatch_semaphore_t sema = dispatch_semaphore_create(0);
[self.connection.remoteObjectProxy requestCurrentTaskName:^(NSString *task) {
taskName = task;
dispatch_semaphore_signal(sema);
}];
dispatch_semaphore_wait(sema, DISPATCH_TIME_FOREVER);
return taskName;
QOS_CLASS_USER_INTERACTIVE
;QoS
將會被提升為QOS_CLASS_USER_INITIATED
;sema
阻塞,而負責釋放該號誌的非同步任務的優先順序QOS_CLASS_USER_INITIATED
低於主執行緒的優先順序QOS_CLASS_USER_INTERACTIVE
,因此可能會發生優先順序反轉。值得一提的是,Clang
專門針對這種情況進行了靜態檢測:
static auto findGCDAntiPatternWithSemaphore() -> decltype(compoundStmt()) {
const char *SemaphoreBinding = "semaphore_name";
auto SemaphoreCreateM = callExpr(allOf(
callsName("dispatch_semaphore_create"),
hasArgument(0, ignoringParenCasts(integerLiteral(equals(0))))));
auto SemaphoreBindingM = anyOf(
forEachDescendant(
varDecl(hasDescendant(SemaphoreCreateM)).bind(SemaphoreBinding)),
forEachDescendant(binaryOperator(bindAssignmentToDecl(SemaphoreBinding),
hasRHS(SemaphoreCreateM))));
auto HasBlockArgumentM = hasAnyArgument(hasType(
hasCanonicalType(blockPointerType())
));
auto ArgCallsSignalM = hasAnyArgument(stmt(hasDescendant(callExpr(
allOf(
callsName("dispatch_semaphore_signal"),
equalsBoundArgDecl(0, SemaphoreBinding)
)))));
auto HasBlockAndCallsSignalM = allOf(HasBlockArgumentM, ArgCallsSignalM);
auto HasBlockCallingSignalM =
forEachDescendant(
stmt(anyOf(
callExpr(HasBlockAndCallsSignalM),
objcMessageExpr(HasBlockAndCallsSignalM)
)));
auto SemaphoreWaitM = forEachDescendant(
callExpr(
allOf(
callsName("dispatch_semaphore_wait"),
equalsBoundArgDecl(0, SemaphoreBinding)
)
).bind(WarnAtNode));
return compoundStmt(
SemaphoreBindingM, HasBlockCallingSignalM, SemaphoreWaitM);
}
如果想使用該功能,只需要開啟xcode
設定即可:
另外,
dispatch_group
跟semaphore
類似,在呼叫enter()
方法時,無法預知誰會呼叫leave()
,所以系統也無法知道其owner
是誰,所以同樣不會有優先順序提升的問題。
dispatch_semaphore
給筆者的印象非常深刻,之前寫過一段這樣的程式碼:使用號誌在主執行緒同步等待相機授權結果。
__block BOOL auth = NO;
dispatch_semaphore_t semaphore = dispatch_semaphore_create(0);
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) {
auth = allow;
dispatch_semaphore_signal(semaphore);
}];
dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
上線後長期佔據卡死top1
,當時百思不得其解,在深入瞭解到號誌無法避免優先順序反轉後,終於豁然開朗,一掃之前心中的陰霾。這類問題一般通過2
種方式來解決:
API
BOOL auth = [KTAuthorizeService authorizationWithType:KTPermissionsTypeCamera];
// do something next
[KTAuthorizeService requestAuthorizationWithType:KTPermissionsTypeCamera completionHandler:^(BOOL allow) {
BOOL auth = allow;
// do something next via callback
}];
前文提到XNU
使用turnstile
進行優先順序繼承,這裡對turnstile
機制進行簡單的描述和理解。在XNU
核心中,存在著大量的同步物件(例如lck_mtx_t
),為了解決優先順序反轉的問題,每個同步物件都必須對應一個分離的資料結構來維護大量的資訊,例如阻塞在這個同步物件上的執行緒佇列。可以想象一下,如果每個同步物件都要分配一個這樣的資料結構,將造成極大的記憶體浪費。為了解決這個問題,XNU
採用了turnstile
機制,一種空間利用率很高的解決方案。該方案的提出依據是同一個執行緒在同一時刻不能同時阻塞於多個同步物件上。這一事實允許所有同步物件只需要保留一個指向turnstile
的指標,且在需要的時候去分配一個turnstile
即可,而turnstile
則包含了操作一個同步物件需要的所有資訊,例如阻塞執行緒的佇列、擁有這個同步物件的執行緒指標。turnstile
是從池中動態分配的,這個池的大小會隨著系統中已分配的執行緒數目增加而增加,所以turnstile
總數將始終低於或等於執行緒數,這也決定了turnstile
的數目是可控的。turnstile
由阻塞在該同步物件上的第一個執行緒負責分配,當沒有更多執行緒阻塞在該同步物件上,turnstile
會被釋放,回收到池中。turnstile
的資料結構如下:
struct turnstile {
struct waitq ts_waitq; /* waitq embedded in turnstile */
turnstile_inheritor_t ts_inheritor; /* thread/turnstile inheriting the priority (IL, WL) */
union {
struct turnstile_list ts_free_turnstiles; /* turnstile free list (IL) */
SLIST_ENTRY(turnstile) ts_free_elm; /* turnstile free list element (IL) */
};
struct priority_queue_sched_max ts_inheritor_queue; /* Queue of turnstile with us as an inheritor (WL) */
union {
struct priority_queue_entry_sched ts_inheritor_links; /* Inheritor queue links */
struct mpsc_queue_chain ts_deallocate_link; /* thread deallocate link */
};
SLIST_ENTRY(turnstile) ts_htable_link; /* linkage for turnstile in global hash table */
uintptr_t ts_proprietor; /* hash key lookup turnstile (IL) */
os_refcnt_t ts_refcount; /* reference count for turnstiles */
_Atomic uint32_t ts_type_gencount; /* gen count used for priority chaining (IL), type of turnstile (IL) */
uint32_t ts_port_ref; /* number of explicit refs from ports on send turnstile */
turnstile_update_flags_t ts_inheritor_flags; /* flags for turnstile inheritor (IL, WL) */
uint8_t ts_priority; /* priority of turnstile (WL) */
#if DEVELOPMENT || DEBUG
uint8_t ts_state; /* current state of turnstile (IL) */
queue_chain_t ts_global_elm; /* global turnstile chain */
thread_t ts_thread; /* thread the turnstile is attached to */
thread_t ts_prev_thread; /* thread the turnstile was attached before donation */
#endif
};
在驗證環節有一些優先順序數值,這裡藉助「Mac OS® X and iOS Internals 」解釋一下:實驗中涉及到的優先順序數值都是相對於Mach
層而言的,且都是使用者執行緒數值
NSQualityOfServiceBackground
的Mach
層級優先順序數是4;NSQualityOfServiceUtility
的Mach
層級優先順序數是20;NSQualityOfServiceDefault
的Mach
層級優先順序數是31;NSQualityOfServiceUserInitiated
的Mach
層級優先順序數是37;NSQualityOfServiceUserInteractive
的Mach
層級優先順序是47;本文主要闡述了優先順序反轉的一些概念和解決思路,並結合iOS
平臺的幾種鎖進行了詳細的調研。通過深入的理解,可以去規避一些不必要的優先順序反轉,從而進一步避免卡死異常。位元組跳動 APM
團隊也針對執行緒的優先順序做了監控處理,進而達到發現和預防優先順序反轉的目的。
位元組跳動 APM 中臺致力於提升整個集團內全系產品的效能和穩定性表現,技術棧覆蓋iOS/Android/Server/Web/Hybrid/PC/遊戲/小程式等,工作內容包括但不限於效能穩定性監控,問題排查,深度優化,防劣化等。長期期望為業界輸出更多更有建設性的問題發現和深度優化手段。
歡迎對位元組APM團隊職位感興趣的同學投遞簡歷到郵箱 [email protected] 。