Redis源码阅读之hset命令

Redis源码阅读之hset命令

June 22, 2024
Redis
Redis
Redis版本:7.0.2

hset命令对应函数hsetCommand,位于t_hash.c文件中,源码如下:

void hsetCommand(client *c) {
    int i, created = 0;
    robj *o;

    if ((c->argc % 2) == 1) {
        addReplyErrorArity(c);
        return;
    }

    if ((o = hashTypeLookupWriteOrCreate(c,c->argv[1])) == NULL) return;
    hashTypeTryConversion(o,c->argv,2,c->argc-1);

    for (i = 2; i < c->argc; i += 2)
        created += !hashTypeSet(o,c->argv[i]->ptr,c->argv[i+1]->ptr,HASH_SET_COPY);

    /* HMSET (deprecated) and HSET return value is different. */
    char *cmdname = c->argv[0]->ptr;
    if (cmdname[1] == 's' || cmdname[1] == 'S') {
        /* HSET */
        addReplyLongLong(c, created);
    } else {
        /* HMSET */
        addReply(c, shared.ok);
    }
    signalModifiedKey(c,c->db,c->argv[1]);
    notifyKeyspaceEvent(NOTIFY_HASH,"hset",c->argv[1],c->db->id);
    server.dirty += (c->argc - 2)/2;
}

逻辑梳理:

  1. 判断参数个数是否为奇数,如果是则返回错误信息。
  2. 调用hashTypeLookupWriteOrCreate查找key,如果不存在,则创建一个新的。
  3. 执行hashTypeTryConversion函数,尝试转换hash类型。
  4. 遍历参数,执行hashTypeSet函数,设置key-value对。
  5. 根据命令类型,返回不同的结果。如果是hset,返回数量。如果是HMSET,则返回OK。
  6. 调用signalModifiedKey发送变更通知。
  7. 调用notifyKeyspaceEvent,发送键空间事件通知。Redis支持client订阅特定事件。 我们这里主要关注第2、3、4步。

hashTypeLookupWriteOrCreate-查找key #

hashTypeLookupWriteOrCreate函数位于t_hash.c文件中,源码如下:

robj *hashTypeLookupWriteOrCreate(client *c, robj *key) {
    robj *o = lookupKeyWrite(c->db,key);
    if (checkType(c,o,OBJ_HASH)) return NULL;

    if (o == NULL) {
        o = createHashObject();
        dbAdd(c->db,key,o);
    }
    return o;
}

可以看到,其主要逻辑为:

  1. 查找key。
  2. 校验类型,如果不是hash类型,直接返回。
  3. 如果不存在,则创建一个新的hash对象,并添加到db中。

接下来我们来看一下createHashObject函数,源码如下:


/******object.c*********/

robj *createHashObject(void) {
    unsigned char *zl = lpNew(0);
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_LISTPACK;
    return o;
}

robj *createHashObject(void) {
    unsigned char *zl = lpNew(0);
    robj *o = createObject(OBJ_HASH, zl);
    o->encoding = OBJ_ENCODING_LISTPACK;
    return o;
}

/********listpack.c**********/

/* Create a new, empty listpack.
 * 创建一个新的空listpack。 
 * On success the new listpack is returned, otherwise an error is returned.
 * Pre-allocate at least `capacity` bytes of memory,
 * over-allocated memory can be shrunk by `lpShrinkToFit`.
 * 成功时返回新的listpack,否则返回错误。
 * 预先分配至少`capacity`字节的内存,
 * 过多分配的内存可以通过`lpShrinkToFit`来缩小。
 * 
 * */
unsigned char *lpNew(size_t capacity) {
    unsigned char *lp = lp_malloc(capacity > LP_HDR_SIZE+1 ? capacity : LP_HDR_SIZE+1);
    if (lp == NULL) return NULL;
    lpSetTotalBytes(lp,LP_HDR_SIZE+1);
    lpSetNumElements(lp,0);
    lp[LP_HDR_SIZE] = LP_EOF;
    return lp;
}

这里可以看出,新建/初始化的时候,hash类型底层存储用的是listpack,不是真正的hash类型。

hashTypeTryConversion-尝试转换hash类型 #

hashTypeTryConversion函数源码如下:

/***********t_hash.c**************/

/* Check the length of a number of objects to see if we need to convert a
 * listpack to a real hash. Note that we only check string encoded objects
 * as their string length can be queried in constant time. 
 * 检查objects的长度,看是否需要将listpack转换成真正的hash类型。要说明的是我们只检查字符串编码的对象,
 * 因为它们的字符串长度可以在常数时间内查询。
 * */
void hashTypeTryConversion(robj *o, robj **argv, int start, int end) {
    int i;
    size_t sum = 0;

    if (o->encoding != OBJ_ENCODING_LISTPACK) return;

    for (i = start; i <= end; i++) {
        if (!sdsEncodedObject(argv[i]))
            continue;
        size_t len = sdslen(argv[i]->ptr);
        if (len > server.hash_max_listpack_value) {
            hashTypeConvert(o, OBJ_ENCODING_HT);
            return;
        }
        sum += len;
    }
    if (!lpSafeToAdd(o->ptr, sum))
        hashTypeConvert(o, OBJ_ENCODING_HT);
}

主要逻辑:

  1. 判断对象的编码是否为listpack,如果不是则直接返回。
  2. 遍历参数,如果参数是sds编码的对象,且长度大于server.hash_max_listpack_value(默认值是64),则转换为真正的hash类型。

转换的详细逻辑,暂时不细究。

hashTypeSet-设置key-value对 #

hashTypeSet函数源码如下:

/**********************t_hash.c*******************************/

/* Add a new field, overwrite the old with the new value if it already exists.
 * Return 0 on insert and 1 on update.
 * 添加一个新的field,如果已经存在则用新值覆盖旧值。
 * 插入返回0,更新返回1。
 *
 * By default, the key and value SDS strings are copied if needed, so the
 * caller retains ownership of the strings passed. However this behavior
 * can be effected by passing appropriate flags (possibly bitwise OR-ed):
 *
 * HASH_SET_TAKE_FIELD -- The SDS field ownership passes to the function.
 * HASH_SET_TAKE_VALUE -- The SDS value ownership passes to the function.
 * 
 * 默认情况下,如果需要,key和value的SDS字符串会被复制,因此调用者保留了传递的字符串的所有权。
 * 但是,可以通过传递适当的标志(可能是按位OR-ed)来影响此行为:
 * 
 * HASH_SET_TAKE_FIELD -- SDS field所有权传递给函数。
 * HASH_SET_TAKE_VALUE -- SDS value所有权传递给函数。
 *
 * When the flags are used the caller does not need to release the passed
 * SDS string(s). It's up to the function to use the string to create a new
 * entry or to free the SDS string before returning to the caller.
 * 使用标志时,调用者不需要释放传递的SDS字符串。函数可以使用字符串创建新条目,或者在返回给调用者之前释放SDS字符串。
 *
 * HASH_SET_COPY corresponds to no flags passed, and means the default
 * semantics of copying the values if needed.
 * HASH_SET_COPY对应于不传递标志,表示根据需要复制值的默认语义。
 *
 */
#define HASH_SET_TAKE_FIELD (1<<0)
#define HASH_SET_TAKE_VALUE (1<<1)
#define HASH_SET_COPY 0
int hashTypeSet(robj *o, sds field, sds value, int flags) {
    int update = 0;

    /* Check if the field is too long for listpack, and convert before adding the item.
     * This is needed for HINCRBY* case since in other commands this is handled early by
     * hashTypeTryConversion, so this check will be a NOP. */
    if (o->encoding == OBJ_ENCODING_LISTPACK) {
        if (sdslen(field) > server.hash_max_listpack_value || sdslen(value) > server.hash_max_listpack_value)
            hashTypeConvert(o, OBJ_ENCODING_HT);
    }
    
    if (o->encoding == OBJ_ENCODING_LISTPACK) {
        unsigned char *zl, *fptr, *vptr;

        zl = o->ptr;
        fptr = lpFirst(zl);
        if (fptr != NULL) {
            fptr = lpFind(zl, fptr, (unsigned char*)field, sdslen(field), 1);
            if (fptr != NULL) {
                /* Grab pointer to the value (fptr points to the field) */
                vptr = lpNext(zl, fptr);
                serverAssert(vptr != NULL);
                update = 1;

                /* Replace value */
                zl = lpReplace(zl, &vptr, (unsigned char*)value, sdslen(value));
            }
        }

        if (!update) {
            /* Push new field/value pair onto the tail of the listpack */
            zl = lpAppend(zl, (unsigned char*)field, sdslen(field));
            zl = lpAppend(zl, (unsigned char*)value, sdslen(value));
        }
        o->ptr = zl;

        /* Check if the listpack needs to be converted to a hash table */
        if (hashTypeLength(o) > server.hash_max_listpack_entries)
            hashTypeConvert(o, OBJ_ENCODING_HT);
    } else if (o->encoding == OBJ_ENCODING_HT) {
        dictEntry *de = dictFind(o->ptr,field);
        if (de) {
            sdsfree(dictGetVal(de));
            if (flags & HASH_SET_TAKE_VALUE) {
                dictGetVal(de) = value;
                value = NULL;
            } else {
                dictGetVal(de) = sdsdup(value);
            }
            update = 1;
        } else {
            sds f,v;
            if (flags & HASH_SET_TAKE_FIELD) {
                f = field;
                field = NULL;
            } else {
                f = sdsdup(field);
            }
            if (flags & HASH_SET_TAKE_VALUE) {
                v = value;
                value = NULL;
            } else {
                v = sdsdup(value);
            }
            dictAdd(o->ptr,f,v);
        }
    } else {
        serverPanic("Unknown hash encoding");
    }

    /* Free SDS strings we did not referenced elsewhere if the flags
     * want this function to be responsible. */
    if (flags & HASH_SET_TAKE_FIELD && field) sdsfree(field);
    if (flags & HASH_SET_TAKE_VALUE && value) sdsfree(value);
    return update;
}

主要逻辑:

  1. 如果是listpack编码,则判断是否需要转换为真正的hash类型。
  2. 如果是listpack编码,查找/添加/替换元素。
  3. 如果是hash编码,则dictFind在hash中查找,如果存在则更新value。这里,更新是调用sdsup生成一个新的动态字符串对象,然后将val指针指向它。
  4. 如果不存在,则调用dictAdd添加。

我们着重看一下dictAdd函数:

/**********dict.c**************/

/* Add an element to the target hash table */
int dictAdd(dict *d, void *key, void *val)
{
    dictEntry *entry = dictAddRaw(d, key, NULL);

    if (!entry)
        return DICT_ERR;
    dictSetVal(d, entry, val);
    return DICT_OK;
}

/* Low level add or find:
 * This function adds the entry but instead of setting a value returns the
 * dictEntry structure to the user, that will make sure to fill the value
 * field as they wish.
 * 底层的添加或查找:
 * 此函数添加条目,但不设置值,而是将dictEntry结构返回给用户,用户将确保根据需要填充值字段。
 *
 * This function is also directly exposed to the user API to be called
 * mainly in order to store non-pointers inside the hash value, example:
 * 这个函数也直接暴露给用户API,主要是为了存储哈希值内的非指针,例如:
 * 
 * entry = dictAddRaw(dict,mykey,NULL);
 * if (entry != NULL) dictSetSignedIntegerVal(entry,1000);
 *
 * Return values:
 *
 * If key already exists NULL is returned, and "*existing" is populated
 * with the existing entry if existing is not NULL.
 * 如果key已经存在,则返回NULL,并且如果existing不为NULL,则将现有条目填充到"*existing"中。
 * 
 *
 * If key was added, the hash entry is returned to be manipulated by the caller.
 * 如果key被添加,则返回哈希条目,以便调用者操作。
 */
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
    long index;
    dictEntry *entry;
    int htidx;

    if (dictIsRehashing(d))
        _dictRehashStep(d);

    /* Get the index of the new element, or -1 if
     * the element already exists. 
     * 获取新元素的索引,如果元素已经存在,则返回-1。
     */
    if ((index = _dictKeyIndex(d, key, dictHashKey(d, key), existing)) == -1)
        return NULL;

    /* Allocate the memory and store the new entry.
     * Insert the element in top, with the assumption that in a database
     * system it is more likely that recently added entries are accessed
     * more frequently. */
    htidx = dictIsRehashing(d) ? 1 : 0;
    size_t metasize = dictMetadataSize(d);
    entry = zmalloc(sizeof(*entry) + metasize);
    if (metasize > 0)
    {
        memset(dictMetadata(entry), 0, metasize);
    }
    entry->next = d->ht_table[htidx][index];
    d->ht_table[htidx][index] = entry;
    d->ht_used[htidx]++;

    /* Set the hash entry fields. */
    dictSetKey(d, entry, key);
    return entry;
}

/* Returns the index of a free slot that can be populated with
 * a hash entry for the given 'key'.
 * If the key already exists, -1 is returned
 * and the optional output parameter may be filled.
 * 返回给定key对应的槽的下标。
 * 如果key已经存在,则返回-1,可选的输出参数也可能会被填充。
 *
 * Note that if we are in the process of rehashing the hash table, the
 * index is always returned in the context of the second (new) hash table. 
 * 需要注意的是:如果正在对hash表进行rehash,返回的下标会是hash表第二个元素的下标。
 * */
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
    unsigned long idx, table;
    dictEntry *he;
    if (existing)
        *existing = NULL;

    /* Expand the hash table if needed */
    if (_dictExpandIfNeeded(d) == DICT_ERR)
        return -1;
    for (table = 0; table <= 1; table++)
    {
        idx = hash & DICTHT_SIZE_MASK(d->ht_size_exp[table]);
        /* Search if this slot does not already contain the given key */
        he = d->ht_table[table][idx];
        while (he)
        {
            if (key == he->key || dictCompareKeys(d, key, he->key))
            {
                if (existing)
                    *existing = he;
                return -1;
            }
            he = he->next;
        }
        if (!dictIsRehashing(d))
            break;
    }
    return idx;
}

可以看到dictAdd函数主要逻辑:

  1. 调用dictAddRaw函数,获取dictEntry结构体。
  2. 调用dictSetVal设置value。

而dictAddRaw的主要逻辑为:

  1. 判断是否正在进行rehash,如果是,则执行一次_dictRehashStep。

  2. 执行_dictKeyIndex,获取key在hash_table数组中的下标index。 这里注意,调用_dictExpandIfNeeded的时候,会判断是否需要扩容。

  3. 将entry写入到ht_table中,如果正在进行rehash,则写入到ht_table[1],否则写入ht_table[0]。 此处是添加在链表头,把next指针指向原来的表头。

  4. 执行dictSetKey,设置entry的key。

需要记住的几点 #

  1. 全局字典、hash类型使用的是相同的字典类型dict。
  2. hash类型在元素数量较少时使用的是listpack类型,不是字典。