redis5.0源码浅析(三)-字典dict

前言:这个redis源码分析系列,侧重于重点和难点,比较容易理解的部分本文不会提及。强烈建议 结合《redis设计与实现》来学习。

字典节点结构体

typedef struct dictEntry {

void *key;

union {

void *val;

uint64_t u64;

int64_t s64;

double d;

} v;

struct dictEntry *next;

} dictEntry;

创建dict-dictCreate

//dictCreate只是初始化了结构体,没有为ht[0]->table,ht[1]->table 申请空间。

/* Create a new hash table */

dict *dictCreate(dictType *type,void *privDataPtr)

{

dict *d = zmalloc(sizeof(*d));

 

_dictInit(d,type,privDataPtr);

return d;

}

 

/* Initialize the hash table */

int _dictInit(dict *d, dictType *type,void *privDataPtr)

{

_dictReset(&d->ht[0]);

_dictReset(&d->ht[1]);

d->type = type;

d->privdata = privDataPtr;

d->rehashidx = -1;

d->iterators = 0;

return DICT_OK;

}

static void _dictReset(dictht *ht)

{

ht->table = NULL;

ht->size = 0;

ht->sizemask = 0;

ht->used = 0;

}

为dict结构申请内存,初始的ht[0]和ht[1] table=null,size=0,sizemask=0,used=0.即调用dictCreate时没有初始化table空间。在调用dictAdd的时候进行扩展。

 

 

redis dict hash 原来使用MurmurHash算法 ,因为MurmurHash有漏洞使用SipHash

 

 

rehash是什么时候执行的呢

通过定时任务?

每次 添加删除更新执行一次?

可知,当rehashidx=0时触发,寻找 修改 rehashidx=0的代码

修改 rehashidx=0的函数:dictExpand,调用链如图,可知主要的是 add时候 执行修改 rehashidx.

redis5.0源码浅析(三)-字典dict

 

//如果hash table 不是空的,使用不同的size进行扩展

static int _dictExpandIfNeeded(dict *d)

{

/* Incremental rehashing already in progress. Return. */

//如果扩展进行中,返回成功

if (dictIsRehashing(d)) return DICT_OK;

//如果hash table 是空的,则进行初始化扩展

/* If the hash table is empty expand it to the initial size. */

if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);

 

/* If we reached the 1:1 ratio, and we are allowed to resize the hash

* table (global setting) or we should avoid it but the ratio between

* elements/buckets is over the "safe" threshold, we resize doubling

* the number of buckets. */

//如果达到1:1比例,允许扩展hash table 通过参数设置,

//hash table 不是空的,达到扩展条件的情况下进行扩展

if (d->ht[0].used >= d->ht[0].size &&

//dict_force_resize_ratio默认是5 可以通过参数设置

(dict_can_resize ||

d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))

{

return dictExpand(d, d->ht[0].used*2);

}

return DICT_OK;

}

 

 

/* Expand or create the hash table */

//扩展内存 ,根据是否第一次初始化扩展判断,如果是则将ht[0]初始化,否则进行rehash准备处理.

int dictExpand(dict *d, unsigned long size)

{

/* the size is invalid if it is smaller than the number of

* elements already inside the hash table */

if (dictIsRehashing(d) || d->ht[0].used > size)

return DICT_ERR;

 

dictht n; /* the new hash table */

//创建 hash table 时 size=4,下一次扩展2倍

unsigned long realsize = _dictNextPower(size);

 

/* Rehashing to the same table size is not useful. */

//校验resize(此处情况不可能发生,除非_dictNextPower 被更换)

if (realsize == d->ht[0].size) return DICT_ERR;

 

/* Allocate the new hash table and initialize all pointers to NULL */

// new hash table分配内存,

n.size = realsize;

n.sizemask = realsize-1;

n.table = zcalloc(realsize*sizeof(dictEntry*));

n.used = 0;

 

/* Is this the first initialization? If so it's not really a rehashing

* we just set the first hash table so that it can accept keys. */

//如果是第一次初始化扩展,将ht[0]指向 new hashtable

if (d->ht[0].table == NULL) {

d->ht[0] = n;

return DICT_OK;

}

 

/* Prepare a second hash table for incremental rehashing */

//如果不是第一次初始化扩展,执行rehash准备处理, 将ht[1]指向 new hashtable,标识 rehash进行中.

d->ht[1] = n;

d->rehashidx = 0;

return DICT_OK;

}

//根据需要执行扩展,查找key是否存在,如果存在返回-1。

static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)

{

unsigned long idx, table;

dictEntry *he;

if (existing) *existing = NULL;

 

/* Expand the hash table if needed */

//如果需要执行扩展

if (_dictExpandIfNeeded(d) == DICT_ERR)

return -1;

//查找key是否存在, 遍历 ht[0]->table 和ht[1]->table

for (table = 0; table <= 1; table++) {

//计算index

idx = hash & d->ht[table].sizemask;

/* Search if this slot does not already contain the given key */

//获取 index指向的第一个地址

he = d->ht[table].table[idx];

//he指向一个链表结构,从he的头到尾查找。

while(he) {

if (key==he->key || dictCompareKeys(d, key, he->key)) {

if (existing) *existing = he;

return -1;

}

he = he->next;

}

//rehash不在进行中,则只在 ht[0]查找,需要执行break. rehash在进行中 ,ht[0] h[1] 都需要查找

if (!dictIsRehashing(d)) break;

}

return idx;

}

 

rehash使用通过

//添加新的key value,如果成功返回新的 dictEntry,如果key 已经存在返回NULL,并且把存在的节点地址赋值给 existing

dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)

{

long index;

dictEntry *entry;

dictht *ht;

// rehash进行中,执行渐进式哈希,处理一个entry。

if (dictIsRehashing(d)) _dictRehashStep(d);

 

/* Get the index of the new element, or -1 if

* the element already exists. */

//根据需要执行扩展,检查key是否存在,existing 如果是有效的指针,找到key的时候 赋值给existing。

if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)

return NULL;

 

/* Allocate the memory and store the new entry.

* Insert the element in top, with the assumption that in a database

* system it is more likely that recently added entries are accessed

* more frequently. */

 

//申请内存和插入新实例。

// rehash进行中使用ht[1],rehash不在进行中使用ht[0]

ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];

entry = zmalloc(sizeof(*entry));

//将新实例放在 index 链表头

entry->next = ht->table[index];

ht->table[index] = entry;

ht->used++;

 

/* Set the hash entry fields. */

dictSetKey(d, entry, key);

return entry;

}

//dict add方法 依赖dictAddRaw

int dictAdd(dict *d, void *key, void *val)

{

dictEntry *entry = dictAddRaw(d,key,NULL);

 

if (!entry) return DICT_ERR;

dictSetVal(d, entry, val);

return DICT_OK;

}

 

 

 

 

//渐进式rehash 依赖dictRehash, 参数=1, 每次处理一个槽位

static void _dictRehashStep(dict *d) {

if (d->iterators == 0) dictRehash(d,1);//???

}

//rehash真正实现 ,每次处理n个槽位(一个槽位对应一条链表,槽位有可能为null)

int dictRehash(dict *d, int n) {

int empty_visits = n*10; /* Max number of empty buckets to visit. */

if (!dictIsRehashing(d)) return 0;

//从0~(d.size-1)每次循环处理一个槽位上的链表

while(n-- && d->ht[0].used != 0) {

dictEntry *de, *nextde;

 

/* Note that rehashidx can't overflow as we are sure there are more

* elements because ht[0].used != 0 */

assert(d->ht[0].size > (unsigned long)d->rehashidx);

//跳过空槽位,连续空槽位的的数量超过n的10倍,返回1

while(d->ht[0].table[d->rehashidx] == NULL) {

d->rehashidx++;

if (--empty_visits == 0) return 1;

}

//获取槽位地址

de = d->ht[0].table[d->rehashidx];

/* Move all the keys in this bucket from the old to the new hash HT */

//移动当前槽位的所有key到新的 ht上

while(de) {

//逐个移动槽位链表上的entry

uint64_t h;

 

nextde = de->next;

/* Get the index in the new hash table */

//获取新 hashtable上的槽位 index

h = dictHashKey(d, de->key) & d->ht[1].sizemask;

//将de插入ht[1].table[h]的头部位置。 与添加不同 rehash要插入到槽位的头部。

de->next = d->ht[1].table[h];

d->ht[1].table[h] = de;

d->ht[0].used--;

d->ht[1].used++;

de = nextde;

}

//修改原来的槽位指向NULL

d->ht[0].table[d->rehashidx] = NULL;

//槽位索引加1,如果完成rehash,rehasdix=d.size-1

d->rehashidx++;

}

 

/* Check if we already rehashed the whole table... */

//检查是否已经完成rehash修改 标识

if (d->ht[0].used == 0) {

zfree(d->ht[0].table);

d->ht[0] = d->ht[1];

_dictReset(&d->ht[1]);

d->rehashidx = -1;

return 0;

}

 

/* More to rehash... */

return 1;

}

 

 

dictRehash 在添加和查询的时候都调用

调用链如下图

redis5.0源码浅析(三)-字典dict

 

 

dict add的逻辑比较复杂 整理出下图

redis5.0源码浅析(三)-字典dict

 

实现add的过程中 解决问题的思路是什么

 

整体思路

add的插入实例部分实现比较常规,没什么可说的,执行add dict空间可能不够,所以预先检查dict空间。如果不够则进行扩展。

由于一次执行完整个dict的rehash比较耗时,采用了rehash的方案,将rehash分散在dict添加,修改,删除中。

 

代码细节上的思路

第一次add 可以执行 初始化空间扩展

检查key是否存在的同时,检查空间是否足够,不够则扩展

扩展空间的方法处理了创建后初始化空间扩展,后续扩展并标识rehash进行中。

int dictReplace(dict *d, void *key, void *val)

{

dictEntry *entry, *existing, auxentry;

 

/* Try to add the element. If the key

* does not exists dictAdd will succeed. */

//如果不存在则写入,如果存在 使用existing获取 实例地址

entry = dictAddRaw(d,key,&existing);

if (entry) {

dictSetVal(d, entry, val);

return 1;

}

 

/* Set the new value and free the old one. Note that it is important

* to do that in this order, as the value may just be exactly the same

* as the previous one. In this context, think to reference counting,

* you want to increment (set), and then decrement (free), and not the

* reverse. */

//why 这么搞

auxentry = *existing;

dictSetVal(d, existing, val);

dictFreeVal(d, &auxentry);

return 0;

}

//找到下个节点

// index 是table[n] 中的索引,table[1]或table[2]称作一个槽位, iter->entry index槽位上的链表中某个节点

dictEntry *dictNext(dictIterator *iter)

{

//当iter->entry == NULL 时,第一次执行dictNext 或 index所属槽位上的节点已经轮询完毕,

//此时iter->index++; iter->entry=ht->table[iter->index] iter->entry 指向了 下个槽位的头部位置,也就是头部第一个接单的位置。

while (1) {

if (iter->entry == NULL) {

dictht *ht = &iter->d->ht[iter->table];

 

if (iter->index == -1 && iter->table == 0) {

//iter初始化后执行此块

if (iter->safe)

iter->d->iterators++;

else

iter->fingerprint = dictFingerprint(iter->d);

}

iter->index++;

if (iter->index >= (long) ht->size) {

//如果dict[0]已经迭代获取完毕, dict正在进行rehash,则使用ht[1]迭代获取

if (dictIsRehashing(iter->d) && iter->table == 0) {

iter->table++;

iter->index = 0;

ht = &iter->d->ht[1];

} else {

break;

}

}

//iter->entry 指向了 下个槽位的头部位置,也就是头部第一个接单的位置。

iter->entry = ht->table[iter->index];

} else {

//当前链表未耗尽,继续取出next

iter->entry = iter->nextEntry;

}

//如果iter->entry!=NULL,记录 next节点。

//如果iter->entry==NULL,执行下轮循环 查找 下个槽位的开始节点

if (iter->entry) {

/* We need to save the 'next' here, the iterator user

* may delete the entry we are returning. */

iter->nextEntry = iter->entry->next;

return iter->entry;

}

}

return NULL;

}