跳錶可以達到和紅黑樹一樣的時間複雜度 O(logN),且實現簡單,Redis 中的有序集合物件的底層資料結構就使用了跳錶。其作者威廉·普評價:跳躍連結串列是在很多應用中有可能替代平衡樹的一種資料結構。本篇文章將對跳錶的實現及在Redis中的應用進行學習。**
跳錶,即跳躍連結串列(Skip List),是基於並聯的連結串列資料結構,操作效率可以達到O(logN),對並行友好,跳錶的示意圖如下所示。
跳錶的特點,可以概括如下。
•跳錶是多層(level)連結串列結構;
•跳錶中的每一層都是一個有序連結串列,並且按照元素升序(預設)排列;
•跳錶中的元素會在哪一層出現是隨機決定的,但是隻要元素出現在了第 k 層,那麼 k 層以下的連結串列也會出現這個元素;
•跳錶的底層的連結串列包含所有元素;
•跳錶頭節點和尾節點不儲存元素,且頭節點和尾節點的層數就是跳錶的最大層數;
•跳錶中的節點包含兩個指標,一個指標指向同層連結串列的後一節點,一個指標指向下層連結串列的同元素節點。
以上圖中的跳錶為例,如果要查詢元素 71,那麼查詢流程如下圖所示。
從頂層連結串列的頭節點開始查詢,查詢到元素71的節點時,一共遍歷了4個節點,但是如果按照傳統連結串列的方式(即從跳錶的底層連結串列的頭節點開始向後查詢),那麼就需要遍歷7個節點,所以跳錶以空間換時間,縮短了操作跳錶所需要花費的時間。跳躍列表的演演算法有同平衡樹一樣的漸進的預期時間邊界,並且更簡單、更快速和使用更少的空間。這種資料結構是由William Pugh(音譯為威廉·普)發明的,最早出現於他在1990年發表的論文《Skip Lists: A Probabilistic Alternative to Balanced Trees》。 谷歌上找到一篇作者關於跳錶的論文,感興趣強烈建議下載閱讀:
跳錶在動態查詢過程中使用了一種非嚴格的平衡機制來讓插入和刪除都更加便利和快捷,這種非嚴格平衡是基於概率的,而不是平衡樹的嚴格平衡。說到非嚴格平衡,首先想到的是紅黑樹RbTree,它同樣採用非嚴格平衡來避免像AVL那樣調整樹的結構,這裡就不展開講紅黑樹了,看來跳錶也是類似的路子,但是是基於概率實現的。
已知跳錶中的節點,需要有指向當前層連結串列後一節點的指標,和指向下層連結串列的同元素節點的指標,所以跳錶中的節點,定義如下。
public class SkiplistNode {
public int data;
public SkiplistNode next;
public SkiplistNode down;
public int level;
public SkiplistNode(int data, int level) {
this.data = data;
this.level = level;
}
上述是跳錶中的節點的最簡單的定義方式,儲存的元素 data 為整數,節點之間進行比較時直接比較元素 data 的大小。
跳錶初始化時,將每一層連結串列的頭尾節點建立出來並使用集合將頭尾節點進行儲存,頭尾節點的層數隨機指定,且頭尾節點的層數就代表當前跳錶的層數。初始化後,跳錶結構如下所示。
跳錶初始化的相關程式碼如下所示。
public LinkedList<SkiplistNode> headNodes;
public LinkedList<SkiplistNode> tailNodes;
public int curLevel;
public Random random;
public Skiplist() {
random = new Random();
//headNodes用於儲存每一層的頭節點
headNodes = new LinkedList<>();
//tailNodes用於儲存每一層的尾節點
tailNodes = new LinkedList<>();
//初始化跳錶時,跳錶的層數隨機指定
curLevel = getRandomLevel();
//指定了跳錶的初始的隨機層數後,就需要將每一層的頭節點和尾節點建立出來並構建好關係
SkiplistNode head = new SkiplistNode(Integer.MIN_VALUE, 0);
SkiplistNode tail = new SkiplistNode(Integer.MAX_VALUE, 0);
for (int i = 0; i <= curLevel; i++) {
head.next = tail;
headNodes.addFirst(head);
tailNodes.addFirst(tail);
SkiplistNode headNew = new SkiplistNode(Integer.MIN_VALUE, head.level + 1);
SkiplistNode tailNew = new SkiplistNode(Integer.MAX_VALUE, tail.level + 1);
headNew.down = head;
tailNew.down = tail;
head = headNew;
tail = tailNew;
}
}
每一個元素新增到跳錶中時,首先需要隨機指定這個元素在跳錶中的層數,如果隨機指定的層數大於了跳錶的層數,則在將元素新增到跳錶中之前,還需要擴大跳錶的層數,而擴大跳錶的層數就是將頭尾節點的層數擴大。下面給出需要擴大跳錶層數的一次新增的過程。
初始狀態時,跳錶的層數為 2,如下圖所示。
現在要往跳錶中新增元素 120,並且隨機指定的層數為 3,大於了當前跳錶的層數 2,此時需要先擴大跳錶的層數,2如 下圖所示。
現在要往跳錶中新增元素 120,並且隨機指定的層數為 3,大於了當前跳錶的層數 2,此時需要先擴大跳錶的層數,2如 下圖所示。
將元素 120 插入到跳錶中時,從頂層開始,逐層向下插入,如下圖所示。
跳錶的新增方法的程式碼如下所示。
public void add(int num) {
//獲取本次新增的值的層數
int level = getRandomLevel();
//如果本次新增的值的層數大於當前跳錶的層數
//則需要在新增當前值前先將跳錶層數擴充
if (level > curLevel) {
expanLevel(level - curLevel);
}
//curNode表示num值在當前層對應的節點
SkiplistNode curNode = new SkiplistNode(num, level);
//preNode表示curNode在當前層的前一個節點
SkiplistNode preNode = headNodes.get(curLevel - level);
for (int i = 0; i <= level; i++) {
//從當前層的head節點開始向後遍歷,直到找到一個preNode
//使得preNode.data < num <= preNode.next.data
while (preNode.next.data < num) {
preNode = preNode.next;
}
//將curNode插入到preNode和preNode.next中間
curNode.next = preNode.next;
preNode.next = curNode;
//如果當前並不是0層,則繼續向下層新增節點
if (curNode.level > 0) {
SkiplistNode downNode = new SkiplistNode(num, curNode.level - 1);
//curNode指向下一層的節點
curNode.down = downNode;
//curNode向下移動一層
curNode = downNode;
}
//preNode向下移動一層
preNode = preNode.down;
}
}
private void expanLevel(int expanCount) {
SkiplistNode head = headNodes.getFirst();
SkiplistNode tail = tailNodes.getFirst();
for (int i = 0; i < expanCount; i++) {
SkiplistNode headNew = new SkiplistNode(Integer.MIN_VALUE, head.level + 1);
SkiplistNode tailNew = new SkiplistNode(Integer.MAX_VALUE, tail.level + 1);
headNew.down = head;
tailNew.down = tail;
head = headNew;
tail = tailNew;
headNodes.addFirst(head);
tailNodes.addFirst(tail);
}
}
在跳錶中搜尋一個元素時,需要從頂層開始,逐層向下搜尋。搜尋時遵循如下規則。
•目標值大於當前節點的後一節點值時,繼續在本層連結串列上向後搜尋;
•目標值大於當前節點值,小於當前節點的後一節點值時,向下移動一層,從下層連結串列的同節點位置向後搜尋;
•目標值等於當前節點值,搜尋結束。
•下圖是一個搜尋過程的示意圖。
•跳錶的搜尋的程式碼如下所示。
public boolean search(int target) {
//從頂層開始尋找,curNode表示當前遍歷到的節點
SkiplistNode curNode = headNodes.getFirst();
while (curNode != null) {
if (curNode.next.data == target) {
//找到了目標值對應的節點,此時返回true
return true;
} else if (curNode.next.data > target) {
//curNode的後一節點值大於target
//說明目標節點在curNode和curNode.next之間
//此時需要向下層尋找
curNode = curNode.down;
} else {
//curNode的後一節點值小於target
//說明目標節點在curNode的後一節點的後面
//此時在本層繼續向後尋找
curNode = curNode.next;
}
}
return false;
}
當在跳錶中需要刪除某一個元素時,則需要將這個元素在所有層的節點都刪除,具體的刪除規則如下所示。
•首先按照跳錶的搜尋的方式,搜尋待刪除節點,如果能夠搜尋到,此時搜尋到的待刪除節點位於該節點層數的最高層;
•從待刪除節點的最高層往下,將每一層的待刪除節點都刪除掉,刪除方式就是讓待刪除節點的前一節點直接指向待刪除節點的後一節點。
•下圖是一個刪除過程的示意圖。
•跳錶的刪除的程式碼如下所示。
public boolean erase(int num) {
//刪除節點的遍歷過程與尋找節點的遍歷過程是相同的
//不過在刪除節點時如果找到目標節點,則需要執行節點刪除的操作
SkiplistNode curNode = headNodes.getFirst();
while (curNode != null) {
if (curNode.next.data == num) {
//preDeleteNode表示待刪除節點的前一節點
SkiplistNode preDeleteNode = curNode;
while (true) {
//刪除當前層的待刪除節點,就是讓待刪除節點的前一節點指向待刪除節點的後一節點
preDeleteNode.next = curNode.next.next;
//當前層刪除完後,需要繼續刪除下一層的待刪除節點
//這裡讓preDeleteNode向下移動一層
//向下移動一層後,preDeleteNode就不一定是待刪除節點的前一節點了
preDeleteNode = preDeleteNode.down;
//如果preDeleteNode為null,說明已經將底層的待刪除節點刪除了
//此時就結束刪除流程,並返回true
if (preDeleteNode == null) {
return true;
}
//preDeleteNode向下移動一層後,需要繼續從當前位置向後遍歷
//直到找到一個preDeleteNode,使得preDeleteNode.next的值等於目標值
//此時preDeleteNode就又變成了待刪除節點的前一節點
while (preDeleteNode.next.data != num) {
preDeleteNode = preDeleteNode.next;
}
}
} else if (curNode.next.data > num) {
curNode = curNode.down;
} else {
curNode = curNode.next;
}
}
return false;
}
跳錶完整程式碼如下所示。
public class Skiplist {
public LinkedList<SkiplistNode> headNodes;
public LinkedList<SkiplistNode> tailNodes;
public int curLevel;
public Random random;
public Skiplist() {
random = new Random();
//headNodes用於儲存每一層的頭節點
headNodes = new LinkedList<>();
//tailNodes用於儲存每一層的尾節點
tailNodes = new LinkedList<>();
//初始化跳錶時,跳錶的層數隨機指定
curLevel = getRandomLevel();
//指定了跳錶的初始的隨機層數後,就需要將每一層的頭節點和尾節點建立出來並構建好關係
SkiplistNode head = new SkiplistNode(Integer.MIN_VALUE, 0);
SkiplistNode tail = new SkiplistNode(Integer.MAX_VALUE, 0);
for (int i = 0; i <= curLevel; i++) {
head.next = tail;
headNodes.addFirst(head);
tailNodes.addFirst(tail);
SkiplistNode headNew = new SkiplistNode(Integer.MIN_VALUE, head.level + 1);
SkiplistNode tailNew = new SkiplistNode(Integer.MAX_VALUE, tail.level + 1);
headNew.down = head;
tailNew.down = tail;
head = headNew;
tail = tailNew;
}
}
public boolean search(int target) {
//從頂層開始尋找,curNode表示當前遍歷到的節點
SkiplistNode curNode = headNodes.getFirst();
while (curNode != null) {
if (curNode.next.data == target) {
//找到了目標值對應的節點,此時返回true
return true;
} else if (curNode.next.data > target) {
//curNode的後一節點值大於target
//說明目標節點在curNode和curNode.next之間
//此時需要向下層尋找
curNode = curNode.down;
} else {
//curNode的後一節點值小於target
//說明目標節點在curNode的後一節點的後面
//此時在本層繼續向後尋找
curNode = curNode.next;
}
}
return false;
}
public void add(int num) {
//獲取本次新增的值的層數
int level = getRandomLevel();
//如果本次新增的值的層數大於當前跳錶的層數
//則需要在新增當前值前先將跳錶層數擴充
if (level > curLevel) {
expanLevel(level - curLevel);
}
//curNode表示num值在當前層對應的節點
SkiplistNode curNode = new SkiplistNode(num, level);
//preNode表示curNode在當前層的前一個節點
SkiplistNode preNode = headNodes.get(curLevel - level);
for (int i = 0; i <= level; i++) {
//從當前層的head節點開始向後遍歷,直到找到一個preNode
//使得preNode.data < num <= preNode.next.data
while (preNode.next.data < num) {
preNode = preNode.next;
}
//將curNode插入到preNode和preNode.next中間
curNode.next = preNode.next;
preNode.next = curNode;
//如果當前並不是0層,則繼續向下層新增節點
if (curNode.level > 0) {
SkiplistNode downNode = new SkiplistNode(num, curNode.level - 1);
//curNode指向下一層的節點
curNode.down = downNode;
//curNode向下移動一層
curNode = downNode;
}
//preNode向下移動一層
preNode = preNode.down;
}
}
public boolean erase(int num) {
//刪除節點的遍歷過程與尋找節點的遍歷過程是相同的
//不過在刪除節點時如果找到目標節點,則需要執行節點刪除的操作
SkiplistNode curNode = headNodes.getFirst();
while (curNode != null) {
if (curNode.next.data == num) {
//preDeleteNode表示待刪除節點的前一節點
SkiplistNode preDeleteNode = curNode;
while (true) {
//刪除當前層的待刪除節點,就是讓待刪除節點的前一節點指向待刪除節點的後一節點
preDeleteNode.next = curNode.next.next;
//當前層刪除完後,需要繼續刪除下一層的待刪除節點
//這裡讓preDeleteNode向下移動一層
//向下移動一層後,preDeleteNode就不一定是待刪除節點的前一節點了
preDeleteNode = preDeleteNode.down;
//如果preDeleteNode為null,說明已經將底層的待刪除節點刪除了
//此時就結束刪除流程,並返回true
if (preDeleteNode == null) {
return true;
}
//preDeleteNode向下移動一層後,需要繼續從當前位置向後遍歷
//直到找到一個preDeleteNode,使得preDeleteNode.next的值等於目標值
//此時preDeleteNode就又變成了待刪除節點的前一節點
while (preDeleteNode.next.data != num) {
preDeleteNode = preDeleteNode.next;
}
}
} else if (curNode.next.data > num) {
curNode = curNode.down;
} else {
curNode = curNode.next;
}
}
return false;
}
private void expanLevel(int expanCount) {
SkiplistNode head = headNodes.getFirst();
SkiplistNode tail = tailNodes.getFirst();
for (int i = 0; i < expanCount; i++) {
SkiplistNode headNew = new SkiplistNode(Integer.MIN_VALUE, head.level + 1);
SkiplistNode tailNew = new SkiplistNode(Integer.MAX_VALUE, tail.level + 1);
headNew.down = head;
tailNew.down = tail;
head = headNew;
tail = tailNew;
headNodes.addFirst(head);
tailNodes.addFirst(tail);
}
}
private int getRandomLevel() {
int level = 0;
while (random.nextInt(2) > 1) {
level++;
}
return level;
}
}
ZSet結構同時包含一個字典和一個跳躍表,跳躍表按score從小到大儲存所有集合元素。字典儲存著從member到score的對映。這兩種結構通過指標共用相同元素的member和score,不會浪費額外記憶體。
typedef struct zset {
dict *dict;
zskiplist *zsl;
} zset;
ZSet中的字典和跳錶佈局:
隨機層數的實現原理:
跳錶是一個概率型的資料結構,元素的插入層數是隨機指定的。Willam Pugh在論文中描述了它的計算過程如下:指定節點最大層數 MaxLevel,指定概率 p, 預設層數 lvl 為1;
生成一個0~1的亂數r,若r<p,且lvl<MaxLevel ,則lvl ++;
重複第 2 步,直至生成的r >p 為止,此時的 lvl 就是要插入的層數。
論文中生成隨機層數的偽碼:
在Redis中對跳錶的實現基本上也是遵循這個思想的,只不過有微小差異,看下Redis關於跳錶層數的隨機原始碼src/z_set.c:
/* Returns a random level for the new skiplist node we are going to create.
* The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL
* (both inclusive), with a powerlaw-alike distribution where higher
* levels are less likely to be returned. */
int zslRandomLevel(void) {
int level = 1;
while ((random()&0xFFFF) < (ZSKIPLIST_P * 0xFFFF))
level += 1;
return (level<ZSKIPLIST_MAXLEVEL) ? level : ZSKIPLIST_MAXLEVEL;
}
其中兩個宏的定義在redis.h中:
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^32 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
可以看到while中的:
(random()&0xFFFF) < (ZSKIPLIST_P*0xFFFF)
第一眼看到這個公式,因為涉及位運算有些詫異,需要研究一下Antirez為什麼使用位運算來這麼寫?
最開始的猜測是random()返回的是浮點數[0-1],於是乎線上找了個浮點數轉二進位制的工具,輸入0.5看了下結果:
可以看到0.5的32bit轉換16進位制結果為0x3f000000,如果與0xFFFF做與運算結果還是0,不符合預期。
實際應用時對於隨機層數的實現並不統一,重要的是亂數的生成,在LevelDB中對跳錶層數的生成程式碼是這樣的:
template <typename Key, typename Value>
int SkipList<Key, Value>::randomLevel() {
static const unsigned int kBranching = 4;
int height = 1;
while (height < kMaxLevel && ((::Next(rnd_) % kBranching) == 0)) {
height++;
}
assert(height > 0);
assert(height <= kMaxLevel);
return height;
}
uint32_t Next( uint32_t& seed) {
seed = seed & 0x7fffffffu;
if (seed == 0 || seed == 2147483647L) {
seed = 1;
}
static const uint32_t M = 2147483647L;
static const uint64_t A = 16807;
uint64_t product = seed * A;
seed = static_cast<uint32_t>((product >> 31) + (product & M));
if (seed > M) {
seed -= M;
}
return seed;
}
可以看到leveldb使用亂數與kBranching取模,如果值為0就增加一層,這樣雖然沒有使用浮點數,但是也實現了概率平衡。
我們很容易看出,產生越高的節點層數出現概率越低,無論如何層數總是滿足冪次定律越大的數出現的概率越小。
如果某件事的發生頻率和它的某個屬性成冪關係,那麼這個頻率就可以稱之為符合冪次定律。
冪次定律的表現是少數幾個事件的發生頻率佔了整個發生頻率的大部分, 而其餘的大多數事件只佔整個發生頻率的一個小部分。
冪次定律應用到跳錶的隨機層數來說就是大部分的節點層數都是黃色部分,只有少數是綠色部分,並且概率很低。
定量的分析如下:
•節點層數至少為1,大於1的節點層數滿足一個概率分佈。
•節點層數恰好等於1的概率為p^0(1-p)
•節點層數恰好等於2的概率為p^1(1-p)
•節點層數恰好等於3的概率為p^2(1-p)
•節點層數恰好等於4的概率為p^3(1-p)
依次遞推節點層數恰好等於K的概率為p^(k-1)(1-p)
因此如果我們要求節點的平均層數,那麼也就轉換成了求概率分佈的期望問題了:
表中P為概率,V為對應取值,給出了所有取值和概率的可能,因此就可以求這個概率分佈的期望了。方括號裡面的式子其實就是高一年級學的等比數列,常用技巧錯位相減求和,從中可以看到結點層數的期望值與1-p成反比。對於Redis而言,當p=0.25時結點層數的期望是1.33。
跳錶的時間複雜度與AVL樹和紅黑樹相同,可以達到O(logN),但是AVL樹要維持高度的平衡,紅黑樹要維持高度的近似平衡,這都會導致插入或者刪除節點時的一些時間開銷,所以跳錶相較於AVL樹和紅黑樹來說,省去了維持高度的平衡的時間開銷,但是相應的也付出了更多的空間來儲存多個層的節點,所以跳錶是用空間換時間的資料結構。以Redis中底層的資料結構zset作為典型應用來展開,進一步看到跳躍連結串列的實際應用。