Redis源码阅读之zadd
June 23, 2024
Redis版本:7.0.2
有序集合的定义 #
有序集合(zet)有两种不同的编码类型:
- 当元素数量较少且键的长度短时,使用listpack类型。
- 否则,使用跳表编码。这种情况下,zset使用了跳表+哈希表。使用哈希表可以快速查找元素,判断元素是否存在。使用跳表可以对元素进行排序。
使用跳表编码时,源码定义如下:
/* 有序集合 */
typedef struct zset
{
dict *dict;
zskiplist *zsl;
} zset;
源码入口-zaddCommand #
zadd命令为zaddCommand,源码为:
void zaddCommand(client *c) {
zaddGenericCommand(c,ZADD_IN_NONE);
}
/* This generic command implements both ZADD and ZINCRBY.
*
*/
void zaddGenericCommand(client *c, int flags) {
static char *nanerr = "resulting score is not a number (NaN)";
robj *key = c->argv[1];
robj *zobj;
sds ele;
double score = 0, *scores = NULL;
int j, elements, ch = 0;
int scoreidx = 0;
/* The following vars are used in order to track what the command actually
* did during the execution, to reply to the client and to trigger the
* notification of keyspace change.
* 下面的变量用于跟踪命令在执行过程中实际做了什么,在恢复客户端并触发键空间变化的通知。
* */
int added = 0; /* Number of new elements added. 新增的元素数量。*/
int updated = 0; /* Number of elements with updated score. 分数变化的元素数量。 */
int processed = 0; /* Number of elements processed, may remain zero with
options like XX. 处理的元素数量,有特定选项,比如XX时,可能是0。*/
/* Parse options. At the end 'scoreidx' is set to the argument position
* of the score of the first score-element pair.
* 解析选项。最后'scoreidx'被设置为第一个分数-元素对的分数的参数位置。
*/
scoreidx = 2;
while(scoreidx < c->argc) {
char *opt = c->argv[scoreidx]->ptr;
if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
else break;
scoreidx++;
}
/* Turn options into simple to check vars.
* 简化选项。
*/
int incr = (flags & ZADD_IN_INCR) != 0;
int nx = (flags & ZADD_IN_NX) != 0;
int xx = (flags & ZADD_IN_XX) != 0;
int gt = (flags & ZADD_IN_GT) != 0;
int lt = (flags & ZADD_IN_LT) != 0;
/* After the options, we expect to have an even number of args, since
* we expect any number of score-element pairs.
* 选项之后,我们期望有偶数个参数,因为我们期望任意数量的分数-元素对。
*/
elements = c->argc-scoreidx;
if (elements % 2 || !elements) {
addReplyErrorObject(c,shared.syntaxerr);
return;
}
/* Now this holds the number of score-element pairs.
* 现在这个变量保存了分数-元素对的数量。
*/
elements /= 2;
/* Check for incompatible options.
* 检查不兼容的选项。
*/
if (nx && xx) {
addReplyError(c,
"XX and NX options at the same time are not compatible");
return;
}
if ((gt && nx) || (lt && nx) || (gt && lt)) {
addReplyError(c,
"GT, LT, and/or NX options at the same time are not compatible");
return;
}
/* Note that XX is compatible with either GT or LT */
if (incr && elements > 1) {
addReplyError(c,
"INCR option supports a single increment-element pair");
return;
}
/* Start parsing all the scores, we need to emit any syntax error
* before executing additions to the sorted set, as the command should
* either execute fully or nothing at all.
* 开始解析所有的分数。在执行添加操作前,我们需要返回任何语法错误。应为命令要么完全执行,
* 要么不执行。
* */
scores = zmalloc(sizeof(double)*elements);
for (j = 0; j < elements; j++) {
if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
!= C_OK) goto cleanup;
}
/* Lookup the key and create the sorted set if does not exist.
* 查找键,如果不存在则创建有序集合。
*/
zobj = lookupKeyWrite(c->db,key);
if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
if (zobj == NULL) {
if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
if (server.zset_max_listpack_entries == 0 ||
server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
{
zobj = createZsetObject();
} else {
zobj = createZsetListpackObject();
}
dbAdd(c->db,key,zobj);
}
for (j = 0; j < elements; j++) {
double newscore;
score = scores[j];
int retflags = 0;
ele = c->argv[scoreidx+1+j*2]->ptr;
int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
if (retval == 0) {
addReplyError(c,nanerr);
goto cleanup;
}
if (retflags & ZADD_OUT_ADDED) added++;
if (retflags & ZADD_OUT_UPDATED) updated++;
if (!(retflags & ZADD_OUT_NOP)) processed++;
score = newscore;
}
server.dirty += (added+updated);
reply_to_client:
if (incr) { /* ZINCRBY or INCR option. */
if (processed)
addReplyDouble(c,score);
else
addReplyNull(c);
} else { /* ZADD. */
addReplyLongLong(c,ch ? added+updated : added);
}
cleanup:
zfree(scores);
if (added || updated) {
signalModifiedKey(c,c->db,key);
notifyKeyspaceEvent(NOTIFY_ZSET,
incr ? "zincr" : "zadd", key, c->db->id);
}
}
逻辑为:
- 解析选项。
- 判断元素个数。
- 解析分数。
- 查找键,判断键的类型,如果不是有序集合,则释放之前创建的一些变量。
- 如果不存在,则创建有序集合。这里需要注意一下,如果server.zset_max_listpack_entries为0或者元素键长度过长,则调用createZsetObject函数创建有序集合对象,这种情况下会使用dict+跳表存储;否则调用createZsetListpackObject函数创建有序集合对象,使用listpack存储。
- 循环添加,调用zsetAdd将元素添加到有序集合中。
- 更新server.dirty(变更的键的数量)。
创建zset-createSsetObject #
创建zset对象的源码如下:
robj *createZsetObject(void) {
zset *zs = zmalloc(sizeof(*zs));
robj *o;
zs->dict = dictCreate(&zsetDictType);
zs->zsl = zslCreate();
o = createObject(OBJ_ZSET,zs);
o->encoding = OBJ_ENCODING_SKIPLIST;
return o;
}
可以看到:
- zs->dict = dictCreate(&zsetDictType)创建一个字典。
- zs->zsl = zslCreate()创建一个跳表。
- 创建一个OBJ_ZSET类型的对象,设置编码类型为OBJ_ENCODING_SKIPLIST。
向有序集合中添加元素-zsetAdd #
源码如下:
/* Add a new element or update the score of an existing element in a sorted
* set, regardless of its encoding.
* 向有序集合中添加一个成员,或更新一个已存在成员的score。
*
* The set of flags change the command behavior.
* flag标志列表会改变这个命令的行为。
*
* The input flags are the following:
* 输入的flag的值如下:
*
* ZADD_INCR: Increment the current element score by 'score' instead of updating
* the current element score. If the element does not exist, we
* assume 0 as previous score.
* ZADD_NX: Perform the operation only if the element does not exist.
* ZADD_XX: Perform the operation only if the element already exist.
* ZADD_GT: Perform the operation on existing elements only if the new score is
* greater than the current score.
* ZADD_LT: Perform the operation on existing elements only if the new score is
* less than the current score.
* ZADD_INCR: 更新现有成员的socre值,如果这个成员不存在,我们认为这个成员之前的分数值是0。
* ZADD_NX: 只有当成员不存在时才执行操作。
* ZADD_XX: 只有当成员已存在时才执行操作。
* ZADD_GT: 只有当新score比当前值更大时,才更新已有成员。
* ZADD_LT: 只有当新score比当前值更小时,才更新已有成员。
*
* When ZADD_INCR is used, the new score of the element is stored in
* '*newscore' if 'newscore' is not NULL.
*
* The returned flags are the following:
*
* ZADD_NAN: The resulting score is not a number.
* ZADD_ADDED: The element was added (not present before the call).
* ZADD_UPDATED: The element score was updated.
* ZADD_NOP: No operation was performed because of NX or XX.
*
* Return value:
* 返回值:
*
* The function returns 1 on success, and sets the appropriate flags
* ADDED or UPDATED to signal what happened during the operation (note that
* none could be set if we re-added an element using the same score it used
* to have, or in the case a zero increment is used).
* 成功时,这个函数返回1,并会设置相关flag。
*
* The function returns 0 on error, currently only when the increment
* produces a NAN condition, or when the 'score' value is NAN since the
* start.
* 当遇到错误时,这个函数返回0。
*
* The command as a side effect of adding a new element may convert the sorted
* set internal encoding from listpack to hashtable+skiplist.
* 在这个命令执行过程中,添加一个成员可能会将内部编码从listpack转换为哈希表+跳表。
*
* Memory management of 'ele':
*
* The function does not take ownership of the 'ele' SDS string, but copies
* it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
/* Turn options into simple to check vars. */
int incr = (in_flags & ZADD_IN_INCR) != 0;
int nx = (in_flags & ZADD_IN_NX) != 0;
int xx = (in_flags & ZADD_IN_XX) != 0;
int gt = (in_flags & ZADD_IN_GT) != 0;
int lt = (in_flags & ZADD_IN_LT) != 0;
*out_flags = 0; /* We'll return our response flags. */
double curscore;
/* NaN as input is an error regardless of all the other parameters.
* 如果score是NaN,则返回。
*/
if (isnan(score)) {
*out_flags = ZADD_OUT_NAN;
return 0;
}
/* Update the sorted set according to its encoding.
* 根据内部编码更新有序集合。
*/
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {// 如果编码是listpack。
unsigned char *eptr;
if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
/* Remove and re-insert when score changed. */
if (score != curscore) {
zobj->ptr = zzlDelete(zobj->ptr,eptr);
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
/* check if the element is too large or the list
* becomes too long *before* executing zzlInsert. */
if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries ||
sdslen(ele) > server.zset_max_listpack_value ||
!lpSafeToAdd(zobj->ptr, sdslen(ele)))
{
zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
} else {
zobj->ptr = zzlInsert(zobj->ptr,ele,score);
if (newscore) *newscore = score;
*out_flags |= ZADD_OUT_ADDED;
return 1;
}
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
}
/* Note that the above block handling listpack would have either returned or
* converted the key to skiplist. */
if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {// 如果编码是跳表
zset *zs = zobj->ptr;
zskiplistNode *znode;
dictEntry *de;
de = dictFind(zs->dict,ele);
if (de != NULL) {
/* NX? Return, same element already exists. */
if (nx) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
curscore = *(double*)dictGetVal(de);
/* Prepare the score for the increment if needed. */
if (incr) {
score += curscore;
if (isnan(score)) {
*out_flags |= ZADD_OUT_NAN;
return 0;
}
}
/* GT/LT? Only update if score is greater/less than current. */
if ((lt && score >= curscore) || (gt && score <= curscore)) {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
if (newscore) *newscore = score;
/* Remove and re-insert when score changes. */
if (score != curscore) {
znode = zslUpdateScore(zs->zsl,curscore,ele,score);
/* Note that we did not removed the original element from
* the hash table representing the sorted set, so we just
* update the score. */
dictGetVal(de) = &znode->score; /* Update score ptr. */
*out_flags |= ZADD_OUT_UPDATED;
}
return 1;
} else if (!xx) {
ele = sdsdup(ele);
znode = zslInsert(zs->zsl,score,ele);
serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
*out_flags |= ZADD_OUT_ADDED;
if (newscore) *newscore = score;
return 1;
} else {
*out_flags |= ZADD_OUT_NOP;
return 1;
}
} else {
serverPanic("Unknown sorted set encoding");
}
return 0; /* Never reached. */
}
逻辑如下:
- 如果score是NaN,则返回。
- 如果编码是listpack,则查找成员。如果存在且score变化,则删除成员,在重新插入成员。
- 如果不存在,则添加。当成员数量过多或成员字符串长度大于阈值时,转换集合编码。
- 如果编码是跳表,则调用dictFind查找成员。如果分数不同,则调用zslUpdateScore更新跳表中成员的分数。调用dictGetVal查找哈希表中的成员,并直接更新成员的值。
- 如果成员不存在且xx为0(xx=1时,只有成员存在时才操作),则向跳表中插入成员,并向dict中添加成员。
接下来,我们着重看一下在使用跳表时,更新已有成员分数和添加新成员相关源码。
更新已有成员的分数(跳表编码) #
再次重申一遍,使用跳表编码时,需要更新成员在哈希表中的分数和跳表中的分数。
跳表编码下,更新已有成员在跳表的分数源码如下:
/* Update the score of an element inside the sorted set skiplist.
* Note that the element must exist and must match 'score'.
* This function does not update the score in the hash table side, the
* caller should take care of it.
* 更新有序集合跳表中元素的分数。
* 注意,该元素必须存在,并且必须匹配'score'。
* 此函数不会更新哈希表中的分数,调用者应该自行处理。
*
* Note that this function attempts to just update the node, in case after
* the score update, the node would be exactly at the same position.
* Otherwise the skiplist is modified by removing and re-adding a new
* element, which is more costly.
* 请注意,该函数尝试仅更新节点。如果分数更新后,节点位置不变,则不做其他操作。
* 否则,通过添加一个元素并重新添加一个新元素的方法来修改跳表,这样做成本更高。
*
* The function returns the updated element skiplist node pointer.
* 函数返回更新后的元素跳表节点指针。
* */
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
int i;
/* We need to seek to element to update to start: this is useful anyway,
* we'll have to update or remove it.
* 我们需要先查找元素的位置,这样做是有用的,我们需要更新或删除它。
*/
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
while (x->level[i].forward &&
(x->level[i].forward->score < curscore ||
(x->level[i].forward->score == curscore &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
x = x->level[i].forward;
}
update[i] = x;
}
/* Jump to our element: note that this function assumes that the
* element with the matching score exists.
* 跳到我们的元素:请注意,此函数假设具有匹配分数的元素存在。
*/
x = x->level[0].forward;
serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);
/* If the node, after the score update, would be still exactly
* at the same position, we can just update the score without
* actually removing and re-inserting the element in the skiplist.
* 如果节点在分数更新后仍然处于完全相同的位置,我们只需更新分数,而无需实际移除并重新插入。
* */
if ((x->backward == NULL || x->backward->score < newscore) &&
(x->level[0].forward == NULL || x->level[0].forward->score > newscore))
{
x->score = newscore;
return x;
}
/* No way to reuse the old node: we need to remove and insert a new
* one at a different place.
* 无法重用旧节点:我们需要删除然后在不同的位置插入一个新节点。
*/
zslDeleteNode(zsl, x, update);
zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
/* We reused the old node x->ele SDS string, free the node now
* since zslInsert created a new one.
* 我们重用了旧节点x->ele SDS字符串,现在释放节点,因为zslInsert创建了一个新节点。
*/
x->ele = NULL;
zslFreeNode(x);
return newnode;
}
逻辑为:
- 查找元素。
- 如果分数更新后,元素还在相同的位置,则直接更新分数并返回。
- 在跳表中删除节点。
- 在跳表中插入节点。
- 释放原节点。
添加新成员(跳表编码) #
跳表编码下,添加新成员,有两个操作:
- 在跳表中插入新节点-zslInsert。
- 在哈希表中添加新成员-dictAdd。
zslInsert源码定义如下:
/*************************t_zset.c***************************/
/* Insert a new node in the skiplist. Assumes the element does not already
* exist (up to the caller to enforce that). The skiplist takes ownership
* of the passed SDS string 'ele'.
* 向跳表中插入节点。假定元素不存在(由调用者来保证)。跳表接管传递的SDS字符串'ele'。
* */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
unsigned long rank[ZSKIPLIST_MAXLEVEL];
int i, level;
serverAssert(!isnan(score));
x = zsl->header;
for (i = zsl->level-1; i >= 0; i--) {
/* store rank that is crossed to reach the insert position
* 存储到达插入位置的跨度。
*/
rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
while (x->level[i].forward &&
(x->level[i].forward->score < score ||
(x->level[i].forward->score == score &&
sdscmp(x->level[i].forward->ele,ele) < 0)))
{
rank[i] += x->level[i].span;
x = x->level[i].forward;
}
update[i] = x;
}
/* we assume the element is not already inside, since we allow duplicated
* scores, reinserting the same element should never happen since the
* caller of zslInsert() should test in the hash table if the element is
* already inside or not.
* 我们假设元素不在内部,因为我们允许重复的分数,所以不应该再次插入相同的元素,应该在zslInsert()的调用者中检查哈希表中的元素是否已经存在。
* */
level = zslRandomLevel();
if (level > zsl->level) {
for (i = zsl->level; i < level; i++) {
rank[i] = 0;
update[i] = zsl->header;
update[i]->level[i].span = zsl->length;
}
zsl->level = level;
}
x = zslCreateNode(level,score,ele);
for (i = 0; i < level; i++) {
x->level[i].forward = update[i]->level[i].forward;
update[i]->level[i].forward = x;
/* update span covered by update[i] as x is inserted here
* 更新update[i]覆盖的跨度,因为x在这里插入。
*/
x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
update[i]->level[i].span = (rank[0] - rank[i]) + 1;
}
/* increment span for untouched levels
* 为未更新的层增加跨度。
*/
for (i = level; i < zsl->level; i++) {
update[i]->level[i].span++;
}
x->backward = (update[0] == zsl->header) ? NULL : update[0];
if (x->level[0].forward)
x->level[0].forward->backward = x;
else
zsl->tail = x;
zsl->length++;
return x;
}
其主要逻辑为:
- 查找插入的位置:
- 使用update记录要插入的位置。update[i]表示当前层最后一个小于score的节点。用于后续的插入操作。
- rank[i]表示第i层,从头节点到当前节点的总跨度。
- 生成随机level。如果生成的新level>当前跳表的层数,则设为当前跳表的层数。
- 创建新节点x。
- 插入新节点x。我们上面说过,我们用update[i]记录了第i层小于score的最后一个节点。插入时,我们需要依据update[i]执行插入。同时,我们需要更新相关span值。
跳表的定义 #
源码如下:
/************************server.h************************/
/* ZSETs use a specialized version of Skiplists */
/* 跳表节点的定义 */
typedef struct zskiplistNode
{
sds ele;
double score;
struct zskiplistNode *backward;
struct zskiplistLevel
{
struct zskiplistNode *forward;
unsigned long span;
} level[];
} zskiplistNode;
/* 跳表的定义 */
typedef struct zskiplist
{
struct zskiplistNode *header, *tail;
unsigned long length;
int level;
} zskiplist;
可以看到:
- zkiplist定义了跳表,其属性为:
- level:当前跳表内,层数最大的那个节点的层数。
- length:当前跳表的节点数量。
- header:跳表的表头。
- tail:跳表尾节点。
- zskiplistNode定义了跳表节点的结构,包括成员ele(成员值)、score(分数)、backward(后退指针)和level(层)。其属性为:
- score:节点的分数值。
- backward:后退/后向指针,指向当前节点的前一个节点。用于从表尾向表头遍历时使用。
- level:层数组,标记节点的各层。每个zskiplistLevel有两个成员:forward,前进指针;span,跨度,即从当前节点到下一个节点的跨度。