Redis 跳跃表相关源码阅读笔记，源码文件 server.h & t_zset.c

## 1. 小结

• 最大层数为 ZSKIPLIST_MAXLEVEL
• 跳跃表中不允许相同的 ele，允许相同的 score，按 <score, ele> pair 进行排序
• 增加 backward 指针，可反向遍历跳跃表

## 2. 跳跃表相关数据结构

Redis 中跳跃表大致基于论文Skip Lists: A Probabilistic Alternative to Balanced Trees实现，但有三处改动：

• 允许相同的 score
• 比较不仅仅是基于 score 的大小
• 增加 backward 指针，用于反向遍历

 1 2 3 4 5  typedef struct zskiplist { struct zskiplistNode *header, *tail; unsigned long length; int level; } zskiplist; 
• headertail 分别指向跳跃表的头尾节点
• length 记录该跳跃表中节点个数
• level 记录该跳跃表中除头节点外其他节点的最大层数，其上限为 ZSKIPLIST_MAXLEVEL

 1 2 3 4 5 6 7 8 9  typedef struct zskiplistNode { sds ele; double score; struct zskiplistNode *backward; struct zskiplistLevel { struct zskiplistNode *forward; unsigned long span; } level[]; } zskiplistNode; 
• ele: member 的地址，不允许相同的 member
• backward: 指向前一节点
• level[]: 每个节点含有复数层的遍历通道，每一层包含在该层下一节点的地址 forward，以及此时的步长 span，当 forwardNULL 时，span 等于 0

  1 2 3 4 5 6 7 8 9 10 11  /* Struct to hold a inclusive/exclusive range spec by score comparison. */ typedef struct { double min, max; int minex, maxex; /* are min or max exclusive? */ } zrangespec; /* Struct to hold an inclusive/exclusive range spec by lexicographic comparison. */ typedef struct { sds min, max; /* May be set to shared.(minstring|maxstring) */ int minex, maxex; /* are min or max exclusive? */ } zlexrangespec; 

## 3. 跳跃表 APIs

### 3.1. 构造函数

 1 2 3 4 5 6  /* Create a skiplist node with the specified number of levels. * The SDS string 'ele' is referenced by the node after the call. */ zskiplistNode *zslCreateNode(int level, double score, sds ele); /* Create a new skiplist. */ zskiplist *zslCreate(void); 

### 3.2. 析构函数

 1 2 3 4 5 6 7  /* Free the specified skiplist node. The referenced SDS string representation * of the element is freed too, unless node->ele is set to NULL before calling * this function. */ void zslFreeNode(zskiplistNode *node); /* Free a whole skiplist. */ void zslFree(zskiplist *zsl); 

### 3.3. 插入节点

zslRandomLevel

 1 2 3 4 5  /* Returns a random level for the new skiplist node we are going to create. * The return value of this function is between 1 and ZSKIPLIST_MAXLEVEL * (both inclusive), with a powerlaw-alike distribution where higher * levels are less likely to be returned. */ int zslRandomLevel(void); 

$$P(i) = \begin{cases} (1-p)p^{i-1}, & i \ne ml \cr 1 - \Sigma_{i=1}^{ml - 1}P(i), & i = ml \cr \end{cases}$$

zslInsert

 1 2 3 4  /* Insert a new node in the skiplist. Assumes the element does not already * exist (up to the caller to enforce that). The skiplist takes ownership * of the passed SDS string 'ele'. */ zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele); 

1. 按照 level 从大到小寻找每一层中最靠近($<$)新节点的目标节点并记录其地址 (该目标节点 score 小于新节点 socre 或 score 相同时 ele 小于新节点 ele)

2. 调用 zslRandomLevel 给新节点生成一个随机的 level 值，若该 level 值大于 zsl->level，则将为 zsl->levellevel 每层最靠近新节点的目标节点初始化为头节点，随后更新 zsl->level

3. 根据 levelscoreele 值调用 zslCreateNode 创建新节点，并将其插入跳跃表中，更新新节点以及每层中最靠近新节点的目标节点的 level 数组的内容:

• 新节点 level 值大于之前 zsl->level 时，每层目标节点的 forwardspan 都需更新
• 新节点 level 值小于之前 zsl->level 时，0 -> level 层目标节点的 forwardspan 都需更新，而 level -> zsl->level 层目标节点只需更新 span
4. 更新新节点和其后一节点(level = 0)的 backward 指针

5. 更新 zsl->length

• 尽可能搭乘快速列车，也即要在该层找到新节点的左极限
• 不要坐过站，也即每次在该层新节点的左极限时跳转至下层，寻找更接近新节点的跳跃表节点，直至最底层

### 3.4. Score range APIs

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18  /* Populate the rangespec according to the objects min and max. * * Parse the min-max interval. If one of the values is prefixed * by the "(" character, it's considered "open". For instance * ZRANGEBYSCORE zset (1.5 (2.5 will match min < x < max * ZRANGEBYSCORE zset 1.5 2.5 will instead match min <= x <= max */ static int zslParseRange(robj *min, robj *max, zrangespec *spec); /* Returns if there is a part of the zset is in range. */ int zslIsInRange(zskiplist *zsl, zrangespec *range); /* Find the first node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ zskiplistNode *zslFirstInRange(zskiplist *zsl, zrangespec *range); /* Find the last node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ zskiplistNode *zslLastInRange(zskiplist *zsl, zrangespec *range); 

### 3.5. Lexicographic range APIs

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36  /* Parse max or min argument of ZRANGEBYLEX. * (foo means foo (open interval) * [foo means foo (closed interval) * - means the min string possible * + means the max string possible * * If the string is valid the *dest pointer is set to the redis object * that will be used for the comparison, and ex will be set to 0 or 1 * respectively if the item is exclusive or inclusive. C_OK will be * returned. * * If the string is not a valid range C_ERR is returned, and the value * of *dest and *ex is undefined. */ int zslParseLexRangeItem(robj *item, sds *dest, int *ex); /* Free a lex range structure, must be called only after zelParseLexRange() * populated the structure with success (C_OK returned). */ void zslFreeLexRange(zlexrangespec *spec); /* Populate the lex rangespec according to the objects min and max. * * Return C_OK on success. On error C_ERR is returned. * When OK is returned the structure must be freed with zslFreeLexRange(), * otherwise no release is needed. */ int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec); /* Returns if there is a part of the zset is in the lex range. */ int zslIsInLexRange(zskiplist *zsl, zlexrangespec *range); /* Find the first node that is contained in the specified lex range. * Returns NULL when no element is contained in the range. */ zskiplistNode *zslFirstInLexRange(zskiplist *zsl, zlexrangespec *range); /* Find the last node that is contained in the specified range. * Returns NULL when no element is contained in the range. */ zskiplistNode *zslLastInLexRange(zskiplist *zsl, zlexrangespec *range); 

### 3.6. 删除节点

zslDeleteNode

 1 2 3  /* Internal function used by zslDelete, zslDeleteRangeByScore and * zslDeleteRangeByRank. */ void zslDeleteNode(zskiplist *zsl, zskiplistNode *x, zskiplistNode **update); 

zslDelete

 1 2 3 4 5 6 7 8 9  /* Delete an element with matching score/element from the skiplist. * The function returns 1 if the node was found and deleted, otherwise * 0 is returned. * * If 'node' is NULL the deleted node is freed by zslFreeNode(), otherwise * it is not freed (but just unlinked) and *node is set to the node pointer, * so that it is possible for the caller to reuse the node (including the * referenced SDS string at node->ele). */ int zslDelete(zskiplist *zsl, double score, sds ele, zskiplistNode **node); 

• 该特定节点存在时，删除成功，返回 1，不存在时，删除失败，返回 0
• 传入的 node 不为 NULL 时，保存被删除节点的地址，node 为空时直接释放被删除节点所占内存
• 该函数会先寻找距离被删除节点最靠近的目标节点数组，然后调用 zslDeleteNode 执行删除操作

zslDeleteRangeBy…

  1 2 3 4 5 6 7 8 9 10 11 12 13  /* Delete all the elements with score between min and max from the skiplist. * Min and max are inclusive, so a score >= min || score <= max is deleted. * Note that this function takes the reference to the hash table view of the * sorted set, in order to remove the elements from the hash table too. */ unsigned long zslDeleteRangeByScore(zskiplist *zsl, zrangespec *range, dict *dict); /* Delete all the elements with ele between min and max from the skiplist. */ unsigned long zslDeleteRangeByLex(zskiplist *zsl, zlexrangespec *range, dict *dict); /* Delete all the elements with rank between start and end from the skiplist. * Start and end are inclusive. Note that start and end need to be 1-based */ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned int end, dict *dict); 

  1 2 3 4 5 6 7 8 9 10 11 12  /* Update the score of an elmenent inside the sorted set skiplist. * Note that the element must exist and must match 'score'. * This function does not update the score in the hash table side, the * caller should take care of it. * * Note that this function attempts to just update the node, in case after * the score update, the node would be exactly at the same position. * Otherwise the skiplist is modified by removing and re-adding a new * element, which is more costly. * * The function returns the updated element skiplist node pointer. */ zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore); 
• 调用方需确保根据 cursorceele 匹配的节点必须存在在跳跃表中
• 如果 newscore 不改变节点次序，该函数会就地更新指定节点的 score 值，否则会先从跳跃表中删除原节点，再根据 elenewsocre 重新插入新节点
 1 2 3 4 5 6 7 8  /* Find the rank for an element by both score and key. * Returns 0 when the element cannot be found, rank otherwise. * Note that the rank is 1-based due to the span of zsl->header to the * first element. */ unsigned long zslGetRank(zskiplist *zsl, double score, sds ele); /* Finds an element by its rank. The rank argument needs to be 1-based. */ zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank);