Redis源码分析之QuickList篇 quicklist 是 Redis 对外暴露的 list 数据结构的内部实现,经常被当作队列或栈使用,我们可以从常用的一些 api 上先思考一下它的结构
最常用的就是 lpush、lpop、rpush、rpop,同时它也支持 lindex 查询某元素在 list 中的索引,linsert 在指定元素旁边插入新元素。
从头、尾节点的 push、pop 来看,这就是双向链表最优秀的设计,提供头尾节点的 O(1) 复杂度操作,但普通的链表每个节点都需要一个前驱和后向指针,空间利用率低;且在链表元素过多时查询缓慢,ziplist 让每一个 entry 都存了上一个 entry 和自己的长度,用来实现双向遍历,节省了一些空间,但缺点是增加、修改元素时可能会带来更大的内存拷贝。
有没有办法进行一个时间和空间的权衡呢?quicklist 就是这样设计的:
A doubly linked list of ziplists 翻译为一个 ziplist 的双向链表。
那么每个 ziplist 存多少数据合适呢?比如我有 20 个元素,可以 4 个 ziplist,每个 ziplist 里 5 个元素;也可以 5 个 ziplist,每个 ziplist 里 4 个元素:
ziplist 的元素越多,进行操作时候需要内存拷贝的空间就越多,如果整个 quicklist 只有一个 ziplist 节点,那么其实就退化成了一个 ziplist 了;
ziplist 的元素越少,就需要更多的节点,空间利用率低,更容易产生内存碎片,如果每个 ziplist 只有一个元素,quicklist 就退化成一个普通的双向链表了。
redis 提供了一个这样的配置项:
当取正值的时候,表示按照数据项个数来限定每个 quicklist 节点上的 ziplist 长度。比如,当这个参数配置成 5的时候,表示每个 quicklist 节点的 ziplist 最多包含 5 个数据项。
当取负值的时候,表示按照占用字节数来限定每个 quicklist 节点上的 ziplist 长度。这时,它只能取 -1 到 -5 这五个值,每个值含义如下:
-5: 每个 quicklist 节点上的 ziplist 大小不能超过 64Kb;
-4: 每个 quicklist 节点上的 ziplist 大小不能超过 32Kb;
-3: 每个 quicklist 节点上的 ziplist 大小不能超过 16Kb;
-2: 每个 quicklist 节点上的 ziplist 大小不能超过 8Kb(Redis 默认值);
-1: 每个 quicklist 节点上的 ziplist 大小不能超过 4Kb。
quicklist 1 2 3 4 5 6 7 8 typedef struct quicklist { quicklistNode *head; quicklistNode *tail; unsigned long count; unsigned int len; int fill : 16 ; unsigned int compress : 16 ; } quicklist;
和我们认知里的双向链表一样:
head 头节点指针;
tail 尾节点指针;
count 整个 quicklist 中的元素数量;
len ziplist 的数量;
fill: 16bit,ziplist大小设置,存放list-max-ziplist-size参数的值;
compress: 16bit,节点压缩深度设置,存放list-compress-depth参数的值。
list-max-ziplist-size 的值在最上面已经解释过含义了,list-compress-depth是出于这样的考虑:
当列表很长的时候,最容易被访问的很可能是两端的数据,中间的数据被访问的频率比较低,所以 redis 提供了该选项选择压缩的中间数据,取值含义如下:
0: 是个特殊值,表示都不压缩。这是 Redis 的默认值;
1: 表示 quicklist 两端各有1个节点不压缩,中间的节点压缩;
2: 表示 quicklist 两端各有2个节点不压缩,中间的节点压缩;
3: 表示 quicklist 两端各有3个节点不压缩,中间的节点压缩。
注意,两个端点(头、尾)是不会压缩的。
quicklistNode quicklistNode 是每一个 quicklist 节点的数据结构
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 typedef struct quicklistNode { struct quicklistNode *prev ; struct quicklistNode *next ; unsigned char *zl; unsigned int sz; unsigned int count: 16 ; unsigned int encoding: 2 ; unsigned int container: 2 ; unsigned int recompress: 1 ; unsigned int attempted_compress: 1 ; unsigned int extra: 10 ; } quicklistNode;typedef struct quicklistLZF { unsigned int sz; char compressed[]; } quicklistLZF;
以下是某一时刻一个 quicklist 的示意图:
1 2 list -max-ziplist-size 3 list -compress-depth 2
两端各有 2 个橙黄色节点没有被压缩,对应 list-compress-depth 为 2;
第二个节点和倒数第二个节点,都是元素数量为 3 的 ziplist,对应 list-max-ziplist-size。
代码分析
创建函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 quicklist *quicklistNew (int fill, int compress) { quicklist *quicklist = quicklistCreate(); quicklistSetOptions(quicklist, fill, compress); return quicklist; } quicklist *quicklistCreate (void ) { struct quicklist *quicklist ; quicklist = zmalloc(sizeof (*quicklist)); quicklist->head = quicklist->tail = NULL ; quicklist->len = 0 ; quicklist->count = 0 ; quicklist->compress = 0 ; quicklist->fill = -2 ; return quicklist; }
创建函数其实很简单,只是按照默认值初始化,并把配置项的值放入到 quicklist 对应的 compress、fill 字段上
其他操作复杂的原因在于,比如插入操作过程中要判断是否达到了配置的值,达到的话需要创建新的 node 节点(包括 ziplist );删除操作如果 ziplist 为空了,那么对应的头部或尾部节点也要删除,还可能涉及到里面节点的解压缩问题……
在指定节点插入(lpush、rpush 最终都会调用到这个函数)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 REDIS_STATIC void __quicklistInsertNode(quicklist *quicklist, quicklistNode *old_node, quicklistNode *new_node, int after) { if (after) { new_node->prev = old_node; if (old_node) { new_node->next = old_node->next; if (old_node->next) old_node->next->prev = new_node; old_node->next = new_node; } if (quicklist->tail == old_node) quicklist->tail = new_node; } else { new_node->next = old_node; if (old_node) { new_node->prev = old_node->prev; if (old_node->prev) old_node->prev->next = new_node; old_node->prev = new_node; } if (quicklist->head == old_node) quicklist->head = new_node; } if (quicklist->len == 0 ) { quicklist->head = quicklist->tail = new_node; } if (old_node) quicklistCompress(quicklist, old_node); quicklist->len++; }
push 操作
在调用这个函数前,会根据操作是 lpush 还是 rpush 确定 where 是头还是尾
1 2 3 4 5 6 7 void quicklistPush (quicklist *quicklist, void *value, const size_t sz, int where) { if (where == QUICKLIST_HEAD) { quicklistPushHead(quicklist, value, sz); } else if (where == QUICKLIST_TAIL) { quicklistPushTail(quicklist, value, sz); } }
头插
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 int quicklistPushHead (quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_head = quicklist->head; if (likely( _quicklistNodeAllowInsert(quicklist->head, quicklist->fill, sz))) { quicklist->head->zl = ziplistPush(quicklist->head->zl, value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(quicklist->head); } else { quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_HEAD); quicklistNodeUpdateSz(node); _quicklistInsertNodeBefore(quicklist, quicklist->head, node); } quicklist->count++; quicklist->head->count++; return (orig_head != quicklist->head); } REDIS_STATIC void _quicklistInsertNodeBefore(quicklist *quicklist, quicklistNode *old_node, quicklistNode *new_node) { __quicklistInsertNode(quicklist, old_node, new_node, 0 ); }
尾插
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 int quicklistPushTail (quicklist *quicklist, void *value, size_t sz) { quicklistNode *orig_tail = quicklist->tail; if (likely( _quicklistNodeAllowInsert(quicklist->tail, quicklist->fill, sz))) { quicklist->tail->zl = ziplistPush(quicklist->tail->zl, value, sz, ZIPLIST_TAIL); quicklistNodeUpdateSz(quicklist->tail); } else { quicklistNode *node = quicklistCreateNode(); node->zl = ziplistPush(ziplistNew(), value, sz, ZIPLIST_TAIL); quicklistNodeUpdateSz(node); _quicklistInsertNodeAfter(quicklist, quicklist->tail, node); } quicklist->count++; quicklist->tail->count++; return (orig_tail != quicklist->tail); } REDIS_STATIC void _quicklistInsertNodeAfter(quicklist *quicklist, quicklistNode *old_node, quicklistNode *new_node) { __quicklistInsertNode(quicklist, old_node, new_node, 1 ); }
lpush 和 rpush 的逻辑几乎是一样的,而对其进行压缩,是在 __quicklistInsertNode 函数的 quicklistCompress 函数中……