Redis源码阅读之zadd

Redis源码阅读之zadd

June 23, 2024
Redis
Redis
Redis版本:7.0.2

有序集合的定义 #

有序集合(zet)有两种不同的编码类型:

  • 当元素数量较少且键的长度短时,使用listpack类型。
  • 否则,使用跳表编码。这种情况下,zset使用了跳表+哈希表。使用哈希表可以快速查找元素,判断元素是否存在。使用跳表可以对元素进行排序。

使用跳表编码时,源码定义如下:

/* 有序集合 */
typedef struct zset
{
    dict *dict;
    zskiplist *zsl;
} zset;

源码入口-zaddCommand #

zadd命令为zaddCommand,源码为:

void zaddCommand(client *c) {
    zaddGenericCommand(c,ZADD_IN_NONE);
}

/* This generic command implements both ZADD and ZINCRBY.
 * 
 */
void zaddGenericCommand(client *c, int flags) {
    static char *nanerr = "resulting score is not a number (NaN)";
    robj *key = c->argv[1];
    robj *zobj;
    sds ele;
    double score = 0, *scores = NULL;
    int j, elements, ch = 0;
    int scoreidx = 0;
    /* The following vars are used in order to track what the command actually
     * did during the execution, to reply to the client and to trigger the
     * notification of keyspace change. 
     * 下面的变量用于跟踪命令在执行过程中实际做了什么,在恢复客户端并触发键空间变化的通知。
     * */
    int added = 0;      /* Number of new elements added. 新增的元素数量。*/
    int updated = 0;    /* Number of elements with updated score. 分数变化的元素数量。 */
    int processed = 0;  /* Number of elements processed, may remain zero with
                           options like XX. 处理的元素数量,有特定选项,比如XX时,可能是0。*/

    /* Parse options. At the end 'scoreidx' is set to the argument position
     * of the score of the first score-element pair. 
     * 解析选项。最后'scoreidx'被设置为第一个分数-元素对的分数的参数位置。
     */
    scoreidx = 2;
    while(scoreidx < c->argc) {
        char *opt = c->argv[scoreidx]->ptr;
        if (!strcasecmp(opt,"nx")) flags |= ZADD_IN_NX;
        else if (!strcasecmp(opt,"xx")) flags |= ZADD_IN_XX;
        else if (!strcasecmp(opt,"ch")) ch = 1; /* Return num of elements added or updated. */
        else if (!strcasecmp(opt,"incr")) flags |= ZADD_IN_INCR;
        else if (!strcasecmp(opt,"gt")) flags |= ZADD_IN_GT;
        else if (!strcasecmp(opt,"lt")) flags |= ZADD_IN_LT;
        else break;
        scoreidx++;
    }

    /* Turn options into simple to check vars. 
     * 简化选项。
    */
    int incr = (flags & ZADD_IN_INCR) != 0;
    int nx = (flags & ZADD_IN_NX) != 0;
    int xx = (flags & ZADD_IN_XX) != 0;
    int gt = (flags & ZADD_IN_GT) != 0;
    int lt = (flags & ZADD_IN_LT) != 0;

    /* After the options, we expect to have an even number of args, since
     * we expect any number of score-element pairs. 
     * 选项之后,我们期望有偶数个参数,因为我们期望任意数量的分数-元素对。
     */
    elements = c->argc-scoreidx;
    if (elements % 2 || !elements) {
        addReplyErrorObject(c,shared.syntaxerr);
        return;
    }

    /* Now this holds the number of score-element pairs.
     * 现在这个变量保存了分数-元素对的数量。
     */
    elements /= 2; 

    /* Check for incompatible options. 
     * 检查不兼容的选项。
    */
    if (nx && xx) {
        addReplyError(c,
            "XX and NX options at the same time are not compatible");
        return;
    }
    
    if ((gt && nx) || (lt && nx) || (gt && lt)) {
        addReplyError(c,
            "GT, LT, and/or NX options at the same time are not compatible");
        return;
    }
    /* Note that XX is compatible with either GT or LT */

    if (incr && elements > 1) {
        addReplyError(c,
            "INCR option supports a single increment-element pair");
        return;
    }

    /* Start parsing all the scores, we need to emit any syntax error
     * before executing additions to the sorted set, as the command should
     * either execute fully or nothing at all. 
     * 开始解析所有的分数。在执行添加操作前,我们需要返回任何语法错误。应为命令要么完全执行,
     * 要么不执行。
     * */
    scores = zmalloc(sizeof(double)*elements);
    for (j = 0; j < elements; j++) {
        if (getDoubleFromObjectOrReply(c,c->argv[scoreidx+j*2],&scores[j],NULL)
            != C_OK) goto cleanup;
    }

    /* Lookup the key and create the sorted set if does not exist.
     * 查找键,如果不存在则创建有序集合。
     */
    zobj = lookupKeyWrite(c->db,key);
    if (checkType(c,zobj,OBJ_ZSET)) goto cleanup;
    if (zobj == NULL) {
        if (xx) goto reply_to_client; /* No key + XX option: nothing to do. */
        if (server.zset_max_listpack_entries == 0 ||
            server.zset_max_listpack_value < sdslen(c->argv[scoreidx+1]->ptr))
        {
            zobj = createZsetObject();
        } else {
            zobj = createZsetListpackObject();
        }
        dbAdd(c->db,key,zobj);
    }

    for (j = 0; j < elements; j++) {
        double newscore;
        score = scores[j];
        int retflags = 0;

        ele = c->argv[scoreidx+1+j*2]->ptr;
        int retval = zsetAdd(zobj, score, ele, flags, &retflags, &newscore);
        if (retval == 0) {
            addReplyError(c,nanerr);
            goto cleanup;
        }
        if (retflags & ZADD_OUT_ADDED) added++;
        if (retflags & ZADD_OUT_UPDATED) updated++;
        if (!(retflags & ZADD_OUT_NOP)) processed++;
        score = newscore;
    }
    server.dirty += (added+updated);

reply_to_client:
    if (incr) { /* ZINCRBY or INCR option. */
        if (processed)
            addReplyDouble(c,score);
        else
            addReplyNull(c);
    } else { /* ZADD. */
        addReplyLongLong(c,ch ? added+updated : added);
    }

cleanup:
    zfree(scores);
    if (added || updated) {
        signalModifiedKey(c,c->db,key);
        notifyKeyspaceEvent(NOTIFY_ZSET,
            incr ? "zincr" : "zadd", key, c->db->id);
    }
}

逻辑为:

  1. 解析选项。
  2. 判断元素个数。
  3. 解析分数。
  4. 查找键,判断键的类型,如果不是有序集合,则释放之前创建的一些变量。
  5. 如果不存在,则创建有序集合。这里需要注意一下,如果server.zset_max_listpack_entries为0或者元素键长度过长,则调用createZsetObject函数创建有序集合对象,这种情况下会使用dict+跳表存储;否则调用createZsetListpackObject函数创建有序集合对象,使用listpack存储。
  6. 循环添加,调用zsetAdd将元素添加到有序集合中。
  7. 更新server.dirty(变更的键的数量)。

创建zset-createSsetObject #

创建zset对象的源码如下:

robj *createZsetObject(void) {
    zset *zs = zmalloc(sizeof(*zs));
    robj *o;

    zs->dict = dictCreate(&zsetDictType);
    zs->zsl = zslCreate();
    o = createObject(OBJ_ZSET,zs);
    o->encoding = OBJ_ENCODING_SKIPLIST;
    return o;
}

可以看到:

  1. zs->dict = dictCreate(&zsetDictType)创建一个字典。
  2. zs->zsl = zslCreate()创建一个跳表。
  3. 创建一个OBJ_ZSET类型的对象,设置编码类型为OBJ_ENCODING_SKIPLIST。

向有序集合中添加元素-zsetAdd #

源码如下:

/* Add a new element or update the score of an existing element in a sorted
 * set, regardless of its encoding.
 * 向有序集合中添加一个成员,或更新一个已存在成员的score。
 *
 * The set of flags change the command behavior. 
 * flag标志列表会改变这个命令的行为。
 *
 * The input flags are the following:
 * 输入的flag的值如下:
 *
 * ZADD_INCR: Increment the current element score by 'score' instead of updating
 *            the current element score. If the element does not exist, we
 *            assume 0 as previous score.
 * ZADD_NX:   Perform the operation only if the element does not exist.
 * ZADD_XX:   Perform the operation only if the element already exist.
 * ZADD_GT:   Perform the operation on existing elements only if the new score is 
 *            greater than the current score.
 * ZADD_LT:   Perform the operation on existing elements only if the new score is 
 *            less than the current score.
 * ZADD_INCR: 更新现有成员的socre值,如果这个成员不存在,我们认为这个成员之前的分数值是0。
 * ZADD_NX:   只有当成员不存在时才执行操作。
 * ZADD_XX:   只有当成员已存在时才执行操作。
 * ZADD_GT:   只有当新score比当前值更大时,才更新已有成员。
 * ZADD_LT:  只有当新score比当前值更小时,才更新已有成员。
 *
 * When ZADD_INCR is used, the new score of the element is stored in
 * '*newscore' if 'newscore' is not NULL.
 *
 * The returned flags are the following:
 *
 * ZADD_NAN:     The resulting score is not a number.
 * ZADD_ADDED:   The element was added (not present before the call).
 * ZADD_UPDATED: The element score was updated.
 * ZADD_NOP:     No operation was performed because of NX or XX.
 *
 * Return value:
 * 返回值:
 *
 * The function returns 1 on success, and sets the appropriate flags
 * ADDED or UPDATED to signal what happened during the operation (note that
 * none could be set if we re-added an element using the same score it used
 * to have, or in the case a zero increment is used).
 * 成功时,这个函数返回1,并会设置相关flag。
 *
 * The function returns 0 on error, currently only when the increment
 * produces a NAN condition, or when the 'score' value is NAN since the
 * start.
 * 当遇到错误时,这个函数返回0。
 *
 * The command as a side effect of adding a new element may convert the sorted
 * set internal encoding from listpack to hashtable+skiplist.
 * 在这个命令执行过程中,添加一个成员可能会将内部编码从listpack转换为哈希表+跳表。
 *
 * Memory management of 'ele':
 *
 * The function does not take ownership of the 'ele' SDS string, but copies
 * it if needed. */
int zsetAdd(robj *zobj, double score, sds ele, int in_flags, int *out_flags, double *newscore) {
    /* Turn options into simple to check vars. */
    int incr = (in_flags & ZADD_IN_INCR) != 0;
    int nx = (in_flags & ZADD_IN_NX) != 0;
    int xx = (in_flags & ZADD_IN_XX) != 0;
    int gt = (in_flags & ZADD_IN_GT) != 0;
    int lt = (in_flags & ZADD_IN_LT) != 0;
    *out_flags = 0; /* We'll return our response flags. */
    double curscore;

    /* NaN as input is an error regardless of all the other parameters. 
    * 如果score是NaN,则返回。
    */
    if (isnan(score)) {
        *out_flags = ZADD_OUT_NAN;
        return 0;
    }

    /* Update the sorted set according to its encoding. 
    * 根据内部编码更新有序集合。
    */
    if (zobj->encoding == OBJ_ENCODING_LISTPACK) {// 如果编码是listpack。
        unsigned char *eptr;

        if ((eptr = zzlFind(zobj->ptr,ele,&curscore)) != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changed. */
            if (score != curscore) {
                zobj->ptr = zzlDelete(zobj->ptr,eptr);
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            /* check if the element is too large or the list
             * becomes too long *before* executing zzlInsert. */
            if (zzlLength(zobj->ptr)+1 > server.zset_max_listpack_entries ||
                sdslen(ele) > server.zset_max_listpack_value ||
                !lpSafeToAdd(zobj->ptr, sdslen(ele)))
            {
                zsetConvert(zobj,OBJ_ENCODING_SKIPLIST);
            } else {
                zobj->ptr = zzlInsert(zobj->ptr,ele,score);
                if (newscore) *newscore = score;
                *out_flags |= ZADD_OUT_ADDED;
                return 1;
            }
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    }

    /* Note that the above block handling listpack would have either returned or
     * converted the key to skiplist. */
    if (zobj->encoding == OBJ_ENCODING_SKIPLIST) {// 如果编码是跳表
        zset *zs = zobj->ptr;
        zskiplistNode *znode;
        dictEntry *de;

        de = dictFind(zs->dict,ele);
        if (de != NULL) {
            /* NX? Return, same element already exists. */
            if (nx) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            curscore = *(double*)dictGetVal(de);

            /* Prepare the score for the increment if needed. */
            if (incr) {
                score += curscore;
                if (isnan(score)) {
                    *out_flags |= ZADD_OUT_NAN;
                    return 0;
                }
            }

            /* GT/LT? Only update if score is greater/less than current. */
            if ((lt && score >= curscore) || (gt && score <= curscore)) {
                *out_flags |= ZADD_OUT_NOP;
                return 1;
            }

            if (newscore) *newscore = score;

            /* Remove and re-insert when score changes. */
            if (score != curscore) {
                znode = zslUpdateScore(zs->zsl,curscore,ele,score);
                /* Note that we did not removed the original element from
                 * the hash table representing the sorted set, so we just
                 * update the score. */
                dictGetVal(de) = &znode->score; /* Update score ptr. */
                *out_flags |= ZADD_OUT_UPDATED;
            }
            return 1;
        } else if (!xx) {
            ele = sdsdup(ele);
            znode = zslInsert(zs->zsl,score,ele);
            serverAssert(dictAdd(zs->dict,ele,&znode->score) == DICT_OK);
            *out_flags |= ZADD_OUT_ADDED;
            if (newscore) *newscore = score;
            return 1;
        } else {
            *out_flags |= ZADD_OUT_NOP;
            return 1;
        }
    } else {
        serverPanic("Unknown sorted set encoding");
    }
    return 0; /* Never reached. */
}

逻辑如下:

  1. 如果score是NaN,则返回。
  2. 如果编码是listpack,则查找成员。如果存在且score变化,则删除成员,在重新插入成员。
  3. 如果不存在,则添加。当成员数量过多或成员字符串长度大于阈值时,转换集合编码。
  4. 如果编码是跳表,则调用dictFind查找成员。如果分数不同,则调用zslUpdateScore更新跳表中成员的分数。调用dictGetVal查找哈希表中的成员,并直接更新成员的值。
  5. 如果成员不存在且xx为0(xx=1时,只有成员存在时才操作),则向跳表中插入成员,并向dict中添加成员。

接下来,我们着重看一下在使用跳表时,更新已有成员分数和添加新成员相关源码。

更新已有成员的分数(跳表编码) #

再次重申一遍,使用跳表编码时,需要更新成员在哈希表中的分数和跳表中的分数。

跳表编码下,更新已有成员在跳表的分数源码如下:

/* Update the score of an element inside the sorted set skiplist.
 * Note that the element must exist and must match 'score'.
 * This function does not update the score in the hash table side, the
 * caller should take care of it.
 * 更新有序集合跳表中元素的分数。
 * 注意,该元素必须存在,并且必须匹配'score'。
 * 此函数不会更新哈希表中的分数,调用者应该自行处理。
 *
 * Note that this function attempts to just update the node, in case after
 * the score update, the node would be exactly at the same position.
 * Otherwise the skiplist is modified by removing and re-adding a new
 * element, which is more costly.
 * 请注意,该函数尝试仅更新节点。如果分数更新后,节点位置不变,则不做其他操作。
 * 否则,通过添加一个元素并重新添加一个新元素的方法来修改跳表,这样做成本更高。
 *
 * The function returns the updated element skiplist node pointer. 
 * 函数返回更新后的元素跳表节点指针。
 * */
zskiplistNode *zslUpdateScore(zskiplist *zsl, double curscore, sds ele, double newscore) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    int i;

    /* We need to seek to element to update to start: this is useful anyway,
     * we'll have to update or remove it. 
     * 我们需要先查找元素的位置,这样做是有用的,我们需要更新或删除它。
     */
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        while (x->level[i].forward &&
                (x->level[i].forward->score < curscore ||
                    (x->level[i].forward->score == curscore &&
                     sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            x = x->level[i].forward;
        }
        update[i] = x;
    }

    /* Jump to our element: note that this function assumes that the
     * element with the matching score exists. 
     * 跳到我们的元素:请注意,此函数假设具有匹配分数的元素存在。
     */
    x = x->level[0].forward;
    serverAssert(x && curscore == x->score && sdscmp(x->ele,ele) == 0);

    /* If the node, after the score update, would be still exactly
     * at the same position, we can just update the score without
     * actually removing and re-inserting the element in the skiplist. 
     * 如果节点在分数更新后仍然处于完全相同的位置,我们只需更新分数,而无需实际移除并重新插入。
     * */
    if ((x->backward == NULL || x->backward->score < newscore) &&
        (x->level[0].forward == NULL || x->level[0].forward->score > newscore))
    {
        x->score = newscore;
        return x;
    }

    /* No way to reuse the old node: we need to remove and insert a new
     * one at a different place. 
     * 无法重用旧节点:我们需要删除然后在不同的位置插入一个新节点。
     */
    zslDeleteNode(zsl, x, update);
    zskiplistNode *newnode = zslInsert(zsl,newscore,x->ele);
    /* We reused the old node x->ele SDS string, free the node now
     * since zslInsert created a new one. 
     * 我们重用了旧节点x->ele SDS字符串,现在释放节点,因为zslInsert创建了一个新节点。
     */
    x->ele = NULL;
    zslFreeNode(x);
    return newnode;
}

逻辑为:

  1. 查找元素。
  2. 如果分数更新后,元素还在相同的位置,则直接更新分数并返回。
  3. 在跳表中删除节点。
  4. 在跳表中插入节点。
  5. 释放原节点。

添加新成员(跳表编码) #

跳表编码下,添加新成员,有两个操作:

  1. 在跳表中插入新节点-zslInsert。
  2. 在哈希表中添加新成员-dictAdd。

zslInsert源码定义如下:

/*************************t_zset.c***************************/

/* Insert a new node in the skiplist. Assumes the element does not already
 * exist (up to the caller to enforce that). The skiplist takes ownership
 * of the passed SDS string 'ele'. 
 * 向跳表中插入节点。假定元素不存在(由调用者来保证)。跳表接管传递的SDS字符串'ele'。
 * */
zskiplistNode *zslInsert(zskiplist *zsl, double score, sds ele) {
    zskiplistNode *update[ZSKIPLIST_MAXLEVEL], *x;
    unsigned long rank[ZSKIPLIST_MAXLEVEL];
    int i, level;

    serverAssert(!isnan(score));
    x = zsl->header;
    for (i = zsl->level-1; i >= 0; i--) {
        /* store rank that is crossed to reach the insert position 
        * 存储到达插入位置的跨度。
        */
        rank[i] = i == (zsl->level-1) ? 0 : rank[i+1];
        while (x->level[i].forward &&
                (x->level[i].forward->score < score ||
                    (x->level[i].forward->score == score &&
                    sdscmp(x->level[i].forward->ele,ele) < 0)))
        {
            rank[i] += x->level[i].span;
            x = x->level[i].forward;
        }
        update[i] = x;
    }
    /* we assume the element is not already inside, since we allow duplicated
     * scores, reinserting the same element should never happen since the
     * caller of zslInsert() should test in the hash table if the element is
     * already inside or not. 
     * 我们假设元素不在内部,因为我们允许重复的分数,所以不应该再次插入相同的元素,应该在zslInsert()的调用者中检查哈希表中的元素是否已经存在。
     * */
    level = zslRandomLevel();
    if (level > zsl->level) {
        for (i = zsl->level; i < level; i++) {
            rank[i] = 0;
            update[i] = zsl->header;
            update[i]->level[i].span = zsl->length;
        }
        zsl->level = level;
    }
    x = zslCreateNode(level,score,ele);
    for (i = 0; i < level; i++) {
        x->level[i].forward = update[i]->level[i].forward;
        update[i]->level[i].forward = x;

        /* update span covered by update[i] as x is inserted here 
        * 更新update[i]覆盖的跨度,因为x在这里插入。
        */
        x->level[i].span = update[i]->level[i].span - (rank[0] - rank[i]);
        update[i]->level[i].span = (rank[0] - rank[i]) + 1;
    }

    /* increment span for untouched levels 
    * 为未更新的层增加跨度。
    */
    for (i = level; i < zsl->level; i++) {
        update[i]->level[i].span++;
    }

    x->backward = (update[0] == zsl->header) ? NULL : update[0];
    if (x->level[0].forward)
        x->level[0].forward->backward = x;
    else
        zsl->tail = x;
    zsl->length++;
    return x;
}

其主要逻辑为:

  • 查找插入的位置:
  • 使用update记录要插入的位置。update[i]表示当前层最后一个小于score的节点。用于后续的插入操作。
  • rank[i]表示第i层,从头节点到当前节点的总跨度。
  • 生成随机level。如果生成的新level>当前跳表的层数,则设为当前跳表的层数。
  • 创建新节点x。
  • 插入新节点x。我们上面说过,我们用update[i]记录了第i层小于score的最后一个节点。插入时,我们需要依据update[i]执行插入。同时,我们需要更新相关span值。

跳表的定义 #

源码如下:

/************************server.h************************/

/* ZSETs use a specialized version of Skiplists */

/* 跳表节点的定义 */
typedef struct zskiplistNode
{
    sds ele;
    double score;
    struct zskiplistNode *backward;
    struct zskiplistLevel
    {
        struct zskiplistNode *forward;
        unsigned long span;
    } level[];
} zskiplistNode;

/* 跳表的定义 */
typedef struct zskiplist
{
    struct zskiplistNode *header, *tail;
    unsigned long length;
    int level; 
} zskiplist;

可以看到:

  1. zkiplist定义了跳表,其属性为:
  • level:当前跳表内,层数最大的那个节点的层数。
  • length:当前跳表的节点数量。
  • header:跳表的表头。
  • tail:跳表尾节点。
  1. zskiplistNode定义了跳表节点的结构,包括成员ele(成员值)、score(分数)、backward(后退指针)和level(层)。其属性为:
  • score:节点的分数值。
  • backward:后退/后向指针,指向当前节点的前一个节点。用于从表尾向表头遍历时使用。
  • level:层数组,标记节点的各层。每个zskiplistLevel有两个成员:forward,前进指针;span,跨度,即从当前节点到下一个节点的跨度。