Redis源码阅读之set命令
June 19, 2024
redis版本:7.0.2
函数setGenericCommand,对应文件是t_string.c,源码如下:
/* The setGenericCommand() function implements the SET operation with different
* options and variants. This function is called in order to implement the
* following commands: SET, SETEX, PSETEX, SETNX, GETSET.
* setGenericCommand()函数实现了SET操作,它有多个不同的选项和变体。调用这个函数可以实现下面几个命令:SET, SETEX, PSETEX, SETNX, GETSET.
*
* 'flags' changes the behavior of the command (NX, XX or GET, see below).
* 'flags'影响命令(NX,XX或GET,详情见下面)的操作。
*
* 'expire' represents an expire to set in form of a Redis object as passed
* by the user. It is interpreted according to the specified 'unit'.
* 'expire'代表了一个由用户传递的过期时间,它是Redis对象形式。会根据不同的单位对其进行解析。
*
* 'ok_reply' and 'abort_reply' is what the function will reply to the client
* if the operation is performed, or when it is not because of NX or
* XX flags.
* 'ok_reply' 和 'abort_reply' 是函数在操作成功或者因为NX或XX标志而没有执行操作时,会回复给客户端的内容。
*
* If ok_reply is NULL "+OK" is used.
* If abort_reply is NULL, "$-1" is used.
* 如果ok_reply为NULL,会使用"+OK"。
* 如果abort_reply为NULL,会使用"$-1"。
*
* */
#define OBJ_NO_FLAGS 0
#define OBJ_SET_NX (1 << 0) /* Set if key not exists. */
#define OBJ_SET_XX (1 << 1) /* Set if key exists. */
#define OBJ_EX (1 << 2) /* Set if time in seconds is given */
#define OBJ_PX (1 << 3) /* Set if time in ms in given */
#define OBJ_KEEPTTL (1 << 4) /* Set and keep the ttl */
#define OBJ_SET_GET (1 << 5) /* Set if want to get key before set */
#define OBJ_EXAT (1 << 6) /* Set if timestamp in second is given */
#define OBJ_PXAT (1 << 7) /* Set if timestamp in ms is given */
#define OBJ_PERSIST (1 << 8) /* Set if we need to remove the ttl */
/* Forward declaration */
static int getExpireMillisecondsOrReply(client *c, robj *expire, int flags, int unit, long long *milliseconds);
void setGenericCommand(client *c, int flags, robj *key, robj *val, robj *expire, int unit, robj *ok_reply, robj *abort_reply)
{
long long milliseconds = 0; /* initialized to avoid any harmness warning */
int found = 0;
int setkey_flags = 0;
if (expire && getExpireMillisecondsOrReply(c, expire, flags, unit, &milliseconds) != C_OK)
{
return;
}
if (flags & OBJ_SET_GET)
{
if (getGenericCommand(c) == C_ERR)
return;
}
found = (lookupKeyWrite(c->db, key) != NULL);
if ((flags & OBJ_SET_NX && found) ||
(flags & OBJ_SET_XX && !found))
{
if (!(flags & OBJ_SET_GET))
{
addReply(c, abort_reply ? abort_reply : shared.null[c->resp]);
}
return;
}
setkey_flags |= (flags & OBJ_KEEPTTL) ? SETKEY_KEEPTTL : 0;
setkey_flags |= found ? SETKEY_ALREADY_EXIST : SETKEY_DOESNT_EXIST;
setKey(c, c->db, key, val, setkey_flags);
server.dirty++;
notifyKeyspaceEvent(NOTIFY_STRING, "set", key, c->db->id);
if (expire)
{
setExpire(c, c->db, key, milliseconds);
/* Propagate as SET Key Value PXAT millisecond-timestamp if there is
* EX/PX/EXAT/PXAT flag. */
robj *milliseconds_obj = createStringObjectFromLongLong(milliseconds);
rewriteClientCommandVector(c, 5, shared.set, key, val, shared.pxat, milliseconds_obj);
decrRefCount(milliseconds_obj);
notifyKeyspaceEvent(NOTIFY_GENERIC, "expire", key, c->db->id);
}
if (!(flags & OBJ_SET_GET))
{
addReply(c, ok_reply ? ok_reply : shared.ok);
}
/* Propagate without the GET argument (Isn't needed if we had expire since in that case we completely re-written the command argv) */
if ((flags & OBJ_SET_GET) && !expire)
{
int argc = 0;
int j;
robj **argv = zmalloc((c->argc - 1) * sizeof(robj *));
for (j = 0; j < c->argc; j++)
{
char *a = c->argv[j]->ptr;
/* Skip GET which may be repeated multiple times. */
if (j >= 3 &&
(a[0] == 'g' || a[0] == 'G') &&
(a[1] == 'e' || a[1] == 'E') &&
(a[2] == 't' || a[2] == 'T') && a[3] == '\0')
continue;
argv[argc++] = c->argv[j];
incrRefCount(c->argv[j]);
}
replaceClientCommandVector(c, argc, argv);
}
}
主要逻辑为:
- getExpireMillisecondsOrReply,处理过期时间。
- 如果是setget命令,则执行getGenericCommand函数,进行一次读取操作。
- 执行lookupKeyWrite函数,如果返回结果不是NULL,则found=true,否则found=false。
- 如果是setnx命令且found=true,或这是setxx(set key value xxx,仅更新)且found=false,则直接返回。
- 执行setKey函数,写入。
- 执行notifyKeyspaceEvent,键空间更新事件通知。
- 如果设置了有效期,则执行setExpire函数。
下面再细看一下lookupKeyWrite和setKey这两个函数。
lookupKeyWrite 此函数位于文件db.c中,源码如下:
/* Lookup a key for write operations, and as a side effect, if needed, expires
* the key if its TTL is reached. It's equivalent to lookupKey() with the
* LOOKUP_WRITE flag added.
*
* Returns the linked value object if the key exists or NULL if the key
* does not exist in the specified DB. */
robj *lookupKeyWriteWithFlags(redisDb *db, robj *key, int flags)
{
return lookupKey(db, key, flags | LOOKUP_WRITE);
}
robj *lookupKeyWrite(redisDb *db, robj *key) {
return lookupKeyWriteWithFlags(db, key, LOOKUP_NONE);
}
其最终会调用lookupKey函数,传入标志LOOKUP_WRITE,在这个函数中,会进行Delete操作(如果key已过期的话)。
setKey setKey函数位于db.c中,源码如下:
/* High level Set operation. This function can be used in order to set
* a key, whatever it was existing or not, to a new object.
* 高级Set操作。这个函数可用于设置一个键,将其指向一个新的对象,不管这个键是否存在。
*
* 1) The ref count of the value object is incremented.
* 1)增加值对象的引用计数。
* 2) clients WATCHing for the destination key notified.
* 2)通知WATCHing的客户端,目标键已经被修改。
* 3) The expire time of the key is reset (the key is made persistent),
* unless 'SETKEY_KEEPTTL' is enabled in flags.
* 3)重置键的过期时间(使键变为持久的),除非在flags中启用了'SETKEY_KEEPTTL'。
* 4) The key lookup can take place outside this interface outcome will be
* delivered with 'SETKEY_ALREADY_EXIST' or 'SETKEY_DOESNT_EXIST'
* 4)键查找可以在这个接口之外进行,结果将通过'SETKEY_ALREADY_EXIST'或'SETKEY_DOESNT_EXIST'标志位传递。
*
* All the new keys in the database should be created via this interface.
* The client 'c' argument may be set to NULL if the operation is performed
* in a context where there is no clear client performing the operation.
* database中所有新keys都应该通过这个接口创建。如果操作执行的上下文中没有明确的客户端,客户端'c'参数可以设置为NULL。
*
* */
void setKey(client *c, redisDb *db, robj *key, robj *val, int flags)
{
int keyfound = 0;
if (flags & SETKEY_ALREADY_EXIST)
keyfound = 1;
else if (!(flags & SETKEY_DOESNT_EXIST))
keyfound = (lookupKeyWrite(db, key) != NULL);
if (!keyfound)
{// 新增
dbAdd(db, key, val);
}
else
{// 覆盖写
dbOverwrite(db, key, val);
}
incrRefCount(val);
if (!(flags & SETKEY_KEEPTTL))
removeExpire(db, key);
if (!(flags & SETKEY_NO_SIGNAL))
signalModifiedKey(c, db, key);
}
其主要逻辑为:
- 判断key是否存在。如果不存在,则调用lookUpKeyWrite函数再次查询。
- 如果key不存在,则调用dbAdd函数,新增key。
- 如果key存在,则调用dbOverwrite函数,覆盖写。
- 增加val的引用计数。
- 如果没有设置SETKEY_KEEPTTL,则调用removeExpire函数,移除过期时间。
- 如果没有设置SETKEY_NO_SIGNAL,则调用signalModifiedKey函数,发送信号。
我们看一下doAdd和doOverwrite两个函数: doAdd doAdd函数位于db.c中,源码如下:
/* Add the key to the DB. It's up to the caller to increment the reference
* counter of the value if needed.
* 将键添加到DB中。是否增加引用计数取决于调用者。
*
* The program is aborted if the key already exists.
* 如果键已经存在,程序会中止。
* */
void dbAdd(redisDb *db, robj *key, robj *val)
{
sds copy = sdsdup(key->ptr);
dictEntry *de = dictAddRaw(db->dict, copy, NULL);
serverAssertWithInfo(NULL, key, de != NULL);
dictSetVal(db->dict, de, val);
signalKeyAsReady(db, key, val->type);
if (server.cluster_enabled)
slotToKeyAddEntry(de, db);
notifyKeyspaceEvent(NOTIFY_NEW, "new", key, db->id);
}
逻辑总结:
- 调用sdsup函数复制key ??????why?
- 插入到全局字典中。这里需要注意一下,dictAddRaw函数的返回值是dictEntry,只是分配一个新的entry对象,并没有将value值写到entry中。
- 调用dictSetVal函数,将val写入到entry中。
- 调用signalKeyAsReady函数,发送信号。写入server.ready_keys列表,避免通过脚本重复添加相同key。
- 如果开启了集群模式,则调用slotToKeyAddEntry函数。
- 调用notifyKeyspaceEvent函数,发送键空间事件通知。
doOverwrite doOverwrite函数位于db.c中,源码如下:
/* Overwrite an existing key with a new value. Incrementing the reference
* count of the new value is up to the caller.
* 用新值覆盖写现有键。是否增加新值的引用计数取决于调用者。
* This function does not modify the expire time of the existing key.
* 这个函数不会修改现有键的过期时间。
*
* The program is aborted if the key was not already present.
* 如果键不存在,程序会中止。
* */
void dbOverwrite(redisDb *db, robj *key, robj *val)
{
dictEntry *de = dictFind(db->dict, key->ptr);
serverAssertWithInfo(NULL, key, de != NULL);
dictEntry auxentry = *de;
robj *old = dictGetVal(de);
if (server.maxmemory_policy & MAXMEMORY_FLAG_LFU)
{
val->lru = old->lru;
}
/* Although the key is not really deleted from the database, we regard
* overwrite as two steps of unlink+add, so we still need to call the unlink
* callback of the module.
* 虽然没有真的将键从数据库中删除,我们仍然将overwrite当成unlink+add两个步骤,所以我们仍然需要调用模块的unlink回调。
* */
moduleNotifyKeyUnlink(key, old, db->id);
/* We want to try to unblock any client using a blocking XREADGROUP */
if (old->type == OBJ_STREAM)
signalKeyAsReady(db, key, old->type);
dictSetVal(db->dict, de, val);
if (server.lazyfree_lazy_server_del)
{
freeObjAsync(key, old, db->id);
dictSetVal(db->dict, &auxentry, NULL);
}
dictFreeVal(db->dict, &auxentry);
}
逻辑总结:
- 调用dictFind函数,查找key。
- 调用dictEntry auxentry = *de,将de的值赋给auxentry。
- 调用dictGetVal函数,获取de的值。
- 如果淘汰策略是LFU,则将新值val的lru值设置为old的lru值。
- 调用moduleNotifyKeyUnlink函数,执行unlink,注意,这里只针对OBJ_MODULE类型。
- 如果是stream类型,则调用signalKeyAsReady函数,发送信号。
- 调用dictSetVal设置值。
- 如果开启了惰性删除,则调用freeObjAsync函数,异步释放old对象(value)。
- 调用dictFreeVal函数,释放auxentry(entry)。
备注:
- OBJ_MODULE自定义类型。