Redis源码阅读之set命令

Redis源码阅读之set命令

June 19, 2024
Redis, 数据库
Redis
redis版本:7.0.2

函数setGenericCommand,对应文件是t_string.c,源码如下:

/* The setGenericCommand() function implements the SET operation with different
 * options and variants. This function is called in order to implement the
 * following commands: SET, SETEX, PSETEX, SETNX, GETSET.
 * setGenericCommand()函数实现了SET操作,它有多个不同的选项和变体。调用这个函数可以实现下面几个命令:SET, SETEX, PSETEX, SETNX, GETSET.
 *
 * 'flags' changes the behavior of the command (NX, XX or GET, see below).
 * 'flags'影响命令(NX,XX或GET,详情见下面)的操作。
 *
 * 'expire' represents an expire to set in form of a Redis object as passed
 * by the user. It is interpreted according to the specified 'unit'.
 * 'expire'代表了一个由用户传递的过期时间,它是Redis对象形式。会根据不同的单位对其进行解析。
 *
 * 'ok_reply' and 'abort_reply' is what the function will reply to the client
 * if the operation is performed, or when it is not because of NX or
 * XX flags.
 * 'ok_reply' 和 'abort_reply' 是函数在操作成功或者因为NX或XX标志而没有执行操作时,会回复给客户端的内容。
 *
 * If ok_reply is NULL "+OK" is used.
 * If abort_reply is NULL, "$-1" is used.
 * 如果ok_reply为NULL,会使用"+OK"。
 * 如果abort_reply为NULL,会使用"$-1"。
 * 
 *  */

#define OBJ_NO_FLAGS 0
#define OBJ_SET_NX (1 << 0)  /* Set if key not exists. */
#define OBJ_SET_XX (1 << 1)  /* Set if key exists. */
#define OBJ_EX (1 << 2)      /* Set if time in seconds is given */
#define OBJ_PX (1 << 3)      /* Set if time in ms in given */
#define OBJ_KEEPTTL (1 << 4) /* Set and keep the ttl */
#define OBJ_SET_GET (1 << 5) /* Set if want to get key before set */
#define OBJ_EXAT (1 << 6)    /* Set if timestamp in second is given */
#define OBJ_PXAT (1 << 7)    /* Set if timestamp in ms is given */
#define OBJ_PERSIST (1 << 8) /* Set if we need to remove the ttl */

/* Forward declaration */
static int getExpireMillisecondsOrReply(client *c, robj *expire, int flags, int unit, long long *milliseconds);

void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
{
    long long milliseconds = 0; /* initialized to avoid any harmness warning */
    int found = 0;
    int setkey_flags = 0;

    if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK)
    {
        return;
    }

    if (flags & OBJ_SET_GET)
    {
        if (getGenericCommand(c) == C_ERR)
            return;
    }

    found = (lookupKeyWrite(c->db, key) != NULL);

    if ((flags & OBJ_SET_NX && found) ||
        (flags & OBJ_SET_XX && !found))
    {
        if (!(flags & OBJ_SET_GET))
        {
            addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
        }
        return;
    }

    setkey_flags |= (flags & OBJ_KEEPTTL) ? SETKEY_KEEPTTL : 0;
    setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST;

    setKey(c, c->db, key, val, setkey_flags);
    server.dirty++;
    notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);

    if (expire)
    {
        setExpire(c, c->db, key, milliseconds);
        /* Propagate as SET Key Value PXAT millisecond-timestamp if there is
         * EX/PX/EXAT/PXAT flag. */
        robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
        rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);
        decrRefCount(milliseconds_obj);
        notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
    }

    if (!(flags & OBJ_SET_GET))
    {
        addReply(c, ok_reply ? ok_reply : shared.ok);
    }

    /* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */
    if ((flags & OBJ_SET_GET) && !expire)
    {
        int argc = 0;
        int j;
        robj **argv = zmalloc((c->argc - 1) * sizeof(robj *));
        for (j = 0; j < c->argc; j++)
        {
            char *a = c->argv[j]->ptr;
            /* Skip GET which may be repeated multiple times. */
            if (j >= 3 &&
                (a[0] == 'g' || a[0] == 'G') &&
                (a[1] == 'e' || a[1] == 'E') &&
                (a[2] == 't' || a[2] == 'T') && a[3] == '\0')
                continue;
            argv[argc++] = c->argv[j];
            incrRefCount(c->argv[j]);
        }
        replaceClientCommandVector(c, argc, argv);
    }
}

主要逻辑为:

  1. getExpireMillisecondsOrReply,处理过期时间。
  2. 如果是setget命令,则执行getGenericCommand函数,进行一次读取操作。
  3. 执行lookupKeyWrite函数,如果返回结果不是NULL,则found=true,否则found=false。
  4. 如果是setnx命令且found=true,或这是setxx(set key value xxx,仅更新)且found=false,则直接返回。
  5. 执行setKey函数,写入。
  6. 执行notifyKeyspaceEvent,键空间更新事件通知。
  7. 如果设置了有效期,则执行setExpire函数。

下面再细看一下lookupKeyWrite和setKey这两个函数。

lookupKeyWrite 此函数位于文件db.c中,源码如下:

/* Lookup a key for write operations, and as a side effect, if needed, expires

* the key if its TTL is reached. It's equivalent to lookupKey() with the

* LOOKUP_WRITE flag added.

*

* Returns the linked value object if the key exists or NULL if the key

* does not exist in the specified DB. */

robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags)

{

return lookupKey(db, key, flags | LOOKUP_WRITE);

}

  

robj *lookupKeyWrite(redisDb *db, robj *key) {

return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);

}

其最终会调用lookupKey函数,传入标志LOOKUP_WRITE,在这个函数中,会进行Delete操作(如果key已过期的话)。

setKey setKey函数位于db.c中,源码如下:

/* High level Set operation. This function can be used in order to set
 * a key, whatever it was existing or not, to a new object.
 * 高级Set操作。这个函数可用于设置一个键,将其指向一个新的对象,不管这个键是否存在。
 *
 * 1) The ref count of the value object is incremented.
 * 1)增加值对象的引用计数。
 * 2) clients WATCHing for the destination key notified.
 * 2)通知WATCHing的客户端,目标键已经被修改。
 * 3) The expire time of the key is reset (the key is made persistent),
 *    unless 'SETKEY_KEEPTTL' is enabled in flags.
 * 3)重置键的过期时间(使键变为持久的),除非在flags中启用了'SETKEY_KEEPTTL'。
 * 4) The key lookup can take place outside this interface outcome will be
 *    delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
 * 4)键查找可以在这个接口之外进行,结果将通过'SETKEY_ALREADY_EXIST'或'SETKEY_DOESNT_EXIST'标志位传递。
 *
 * All the new keys in the database should be created via this interface.
 * The client 'c' argument may be set to NULL if the operation is performed
 * in a context where there is no clear client performing the operation. 
 * database中所有新keys都应该通过这个接口创建。如果操作执行的上下文中没有明确的客户端,客户端'c'参数可以设置为NULL。
 * 
 * */
void setKey(client *c, redisDb *db, robj *key, robj *val, int flags)
{
    int keyfound = 0;

    if (flags & SETKEY_ALREADY_EXIST)
        keyfound = 1;
    else if (!(flags & SETKEY_DOESNT_EXIST))
        keyfound = (lookupKeyWrite(db, key) != NULL);

    if (!keyfound)
    {// 新增
        dbAdd(db, key, val);
    }
    else
    {// 覆盖写
        dbOverwrite(db, key, val);
    }
    incrRefCount(val);
    if (!(flags & SETKEY_KEEPTTL))
        removeExpire(db, key);
    if (!(flags & SETKEY_NO_SIGNAL))
        signalModifiedKey(c, db, key);
}

其主要逻辑为:

  1. 判断key是否存在。如果不存在,则调用lookUpKeyWrite函数再次查询。
  2. 如果key不存在,则调用dbAdd函数,新增key。
  3. 如果key存在,则调用dbOverwrite函数,覆盖写。
  4. 增加val的引用计数。
  5. 如果没有设置SETKEY_KEEPTTL,则调用removeExpire函数,移除过期时间。
  6. 如果没有设置SETKEY_NO_SIGNAL,则调用signalModifiedKey函数,发送信号。

我们看一下doAdd和doOverwrite两个函数: doAdd doAdd函数位于db.c中,源码如下:

/* Add the key to the DB. It's up to the caller to increment the reference
 * counter of the value if needed.
 * 将键添加到DB中。是否增加引用计数取决于调用者。
 *
 * The program is aborted if the key already exists.
 * 如果键已经存在,程序会中止。
 *  */
void dbAdd(redisDb *db, robj *key, robj *val)
{
    sds copy = sdsdup(key->ptr);
    dictEntry *de = dictAddRaw(db->dict, copy, NULL);
    serverAssertWithInfo(NULL, key, de != NULL);
    dictSetVal(db->dict, de, val);
    signalKeyAsReady(db, key, val->type);
    if (server.cluster_enabled)
        slotToKeyAddEntry(de, db);
    notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
}

逻辑总结:

  1. 调用sdsup函数复制key ??????why?
  2. 插入到全局字典中。这里需要注意一下,dictAddRaw函数的返回值是dictEntry,只是分配一个新的entry对象,并没有将value值写到entry中。
  3. 调用dictSetVal函数,将val写入到entry中。
  4. 调用signalKeyAsReady函数,发送信号。写入server.ready_keys列表,避免通过脚本重复添加相同key。
  5. 如果开启了集群模式,则调用slotToKeyAddEntry函数。
  6. 调用notifyKeyspaceEvent函数,发送键空间事件通知。

doOverwrite doOverwrite函数位于db.c中,源码如下:

/* Overwrite an existing key with a new value. Incrementing the reference
 * count of the new value is up to the caller.
 * 用新值覆盖写现有键。是否增加新值的引用计数取决于调用者。
 * This function does not modify the expire time of the existing key.
 * 这个函数不会修改现有键的过期时间。
 *
 * The program is aborted if the key was not already present. 
 * 如果键不存在,程序会中止。
 * */
void dbOverwrite(redisDb *db, robj *key, robj *val)
{
    dictEntry *de = dictFind(db->dict, key->ptr);

    serverAssertWithInfo(NULL, key, de != NULL);
    dictEntry auxentry = *de;
    robj *old = dictGetVal(de);
    if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
    {
        val->lru = old->lru;
    }
    /* Although the key is not really deleted from the database, we regard
     * overwrite as two steps of unlink+add, so we still need to call the unlink
     * callback of the module.
     * 虽然没有真的将键从数据库中删除,我们仍然将overwrite当成unlink+add两个步骤,所以我们仍然需要调用模块的unlink回调。
     *  */
    moduleNotifyKeyUnlink(key, old, db->id);
    /* We want to try to unblock any client using a blocking XREADGROUP */
    if (old->type == OBJ_STREAM)
        signalKeyAsReady(db, key, old->type);
    dictSetVal(db->dict, de, val);

    if (server.lazyfree_lazy_server_del)
    {
        freeObjAsync(key, old, db->id);
        dictSetVal(db->dict, &auxentry, NULL);
    }

    dictFreeVal(db->dict, &auxentry);
}

逻辑总结:

  1. 调用dictFind函数,查找key。
  2. 调用dictEntry auxentry = *de,将de的值赋给auxentry。
  3. 调用dictGetVal函数,获取de的值。
  4. 如果淘汰策略是LFU,则将新值val的lru值设置为old的lru值。
  5. 调用moduleNotifyKeyUnlink函数,执行unlink,注意,这里只针对OBJ_MODULE类型。
  6. 如果是stream类型,则调用signalKeyAsReady函数,发送信号。
  7. 调用dictSetVal设置值。
  8. 如果开启了惰性删除,则调用freeObjAsync函数,异步释放old对象(value)。
  9. 调用dictFreeVal函数,释放auxentry(entry)。

备注:

  • OBJ_MODULE自定义类型。