redis5.0源码浅析(三)-字典dict
前言:这个redis源码分析系列,侧重于重点和难点,比较容易理解的部分本文不会提及。强烈建议 结合《redis设计与实现》来学习。
字典节点结构体
typedef struct dictEntry {
void *key;
union {
void *val;
uint64_t u64;
int64_t s64;
double d;
} v;
struct dictEntry *next;
} dictEntry;
创建dict-dictCreate
//dictCreate只是初始化了结构体,没有为ht[0]->table,ht[1]->table 申请空间。
/* Create a new hash table */
dict *dictCreate(dictType *type,void *privDataPtr)
{
dict *d = zmalloc(sizeof(*d));
_dictInit(d,type,privDataPtr);
return d;
}
/* Initialize the hash table */
int _dictInit(dict *d, dictType *type,void *privDataPtr)
{
_dictReset(&d->ht[0]);
_dictReset(&d->ht[1]);
d->type = type;
d->privdata = privDataPtr;
d->rehashidx = -1;
d->iterators = 0;
return DICT_OK;
}
static void _dictReset(dictht *ht)
{
ht->table = NULL;
ht->size = 0;
ht->sizemask = 0;
ht->used = 0;
}
为dict结构申请内存,初始的ht[0]和ht[1] table=null,size=0,sizemask=0,used=0.即调用dictCreate时没有初始化table空间。在调用dictAdd的时候进行扩展。
redis dict hash 原来使用MurmurHash算法 ,因为MurmurHash有漏洞使用SipHash
rehash是什么时候执行的呢
通过定时任务?
每次 添加删除更新执行一次?
可知,当rehashidx=0时触发,寻找 修改 rehashidx=0的代码
修改 rehashidx=0的函数:dictExpand,调用链如图,可知主要的是 add时候 执行修改 rehashidx.
//如果hash table 不是空的,使用不同的size进行扩展
static int _dictExpandIfNeeded(dict *d)
{
/* Incremental rehashing already in progress. Return. */
//如果扩展进行中,返回成功
if (dictIsRehashing(d)) return DICT_OK;
//如果hash table 是空的,则进行初始化扩展
/* If the hash table is empty expand it to the initial size. */
if (d->ht[0].size == 0) return dictExpand(d, DICT_HT_INITIAL_SIZE);
/* If we reached the 1:1 ratio, and we are allowed to resize the hash
* table (global setting) or we should avoid it but the ratio between
* elements/buckets is over the "safe" threshold, we resize doubling
* the number of buckets. */
//如果达到1:1比例,允许扩展hash table 通过参数设置,
//hash table 不是空的,达到扩展条件的情况下进行扩展
if (d->ht[0].used >= d->ht[0].size &&
//dict_force_resize_ratio默认是5 可以通过参数设置
(dict_can_resize ||
d->ht[0].used/d->ht[0].size > dict_force_resize_ratio))
{
return dictExpand(d, d->ht[0].used*2);
}
return DICT_OK;
}
/* Expand or create the hash table */
//扩展内存 ,根据是否第一次初始化扩展判断,如果是则将ht[0]初始化,否则进行rehash准备处理.
int dictExpand(dict *d, unsigned long size)
{
/* the size is invalid if it is smaller than the number of
* elements already inside the hash table */
if (dictIsRehashing(d) || d->ht[0].used > size)
return DICT_ERR;
dictht n; /* the new hash table */
//创建 hash table 时 size=4,下一次扩展2倍
unsigned long realsize = _dictNextPower(size);
/* Rehashing to the same table size is not useful. */
//校验resize(此处情况不可能发生,除非_dictNextPower 被更换)
if (realsize == d->ht[0].size) return DICT_ERR;
/* Allocate the new hash table and initialize all pointers to NULL */
// new hash table分配内存,
n.size = realsize;
n.sizemask = realsize-1;
n.table = zcalloc(realsize*sizeof(dictEntry*));
n.used = 0;
/* Is this the first initialization? If so it's not really a rehashing
* we just set the first hash table so that it can accept keys. */
//如果是第一次初始化扩展,将ht[0]指向 new hashtable
if (d->ht[0].table == NULL) {
d->ht[0] = n;
return DICT_OK;
}
/* Prepare a second hash table for incremental rehashing */
//如果不是第一次初始化扩展,执行rehash准备处理, 将ht[1]指向 new hashtable,标识 rehash进行中.
d->ht[1] = n;
d->rehashidx = 0;
return DICT_OK;
}
//根据需要执行扩展,查找key是否存在,如果存在返回-1。
static long _dictKeyIndex(dict *d, const void *key, uint64_t hash, dictEntry **existing)
{
unsigned long idx, table;
dictEntry *he;
if (existing) *existing = NULL;
/* Expand the hash table if needed */
//如果需要执行扩展
if (_dictExpandIfNeeded(d) == DICT_ERR)
return -1;
//查找key是否存在, 遍历 ht[0]->table 和ht[1]->table
for (table = 0; table <= 1; table++) {
//计算index
idx = hash & d->ht[table].sizemask;
/* Search if this slot does not already contain the given key */
//获取 index指向的第一个地址
he = d->ht[table].table[idx];
//he指向一个链表结构,从he的头到尾查找。
while(he) {
if (key==he->key || dictCompareKeys(d, key, he->key)) {
if (existing) *existing = he;
return -1;
}
he = he->next;
}
//rehash不在进行中,则只在 ht[0]查找,需要执行break. rehash在进行中 ,ht[0] h[1] 都需要查找
if (!dictIsRehashing(d)) break;
}
return idx;
}
rehash使用通过
//添加新的key value,如果成功返回新的 dictEntry,如果key 已经存在返回NULL,并且把存在的节点地址赋值给 existing
dictEntry *dictAddRaw(dict *d, void *key, dictEntry **existing)
{
long index;
dictEntry *entry;
dictht *ht;
// rehash进行中,执行渐进式哈希,处理一个entry。
if (dictIsRehashing(d)) _dictRehashStep(d);
/* Get the index of the new element, or -1 if
* the element already exists. */
//根据需要执行扩展,检查key是否存在,existing 如果是有效的指针,找到key的时候 赋值给existing。
if ((index = _dictKeyIndex(d, key, dictHashKey(d,key), existing)) == -1)
return NULL;
/* Allocate the memory and store the new entry.
* Insert the element in top, with the assumption that in a database
* system it is more likely that recently added entries are accessed
* more frequently. */
//申请内存和插入新实例。
// rehash进行中使用ht[1],rehash不在进行中使用ht[0]
ht = dictIsRehashing(d) ? &d->ht[1] : &d->ht[0];
entry = zmalloc(sizeof(*entry));
//将新实例放在 index 链表头
entry->next = ht->table[index];
ht->table[index] = entry;
ht->used++;
/* Set the hash entry fields. */
dictSetKey(d, entry, key);
return entry;
}
//dict add方法 依赖dictAddRaw
int dictAdd(dict *d, void *key, void *val)
{
dictEntry *entry = dictAddRaw(d,key,NULL);
if (!entry) return DICT_ERR;
dictSetVal(d, entry, val);
return DICT_OK;
}
//渐进式rehash 依赖dictRehash, 参数=1, 每次处理一个槽位
static void _dictRehashStep(dict *d) {
if (d->iterators == 0) dictRehash(d,1);//???
}
//rehash真正实现 ,每次处理n个槽位(一个槽位对应一条链表,槽位有可能为null)
int dictRehash(dict *d, int n) {
int empty_visits = n*10; /* Max number of empty buckets to visit. */
if (!dictIsRehashing(d)) return 0;
//从0~(d.size-1)每次循环处理一个槽位上的链表
while(n-- && d->ht[0].used != 0) {
dictEntry *de, *nextde;
/* Note that rehashidx can't overflow as we are sure there are more
* elements because ht[0].used != 0 */
assert(d->ht[0].size > (unsigned long)d->rehashidx);
//跳过空槽位,连续空槽位的的数量超过n的10倍,返回1
while(d->ht[0].table[d->rehashidx] == NULL) {
d->rehashidx++;
if (--empty_visits == 0) return 1;
}
//获取槽位地址
de = d->ht[0].table[d->rehashidx];
/* Move all the keys in this bucket from the old to the new hash HT */
//移动当前槽位的所有key到新的 ht上
while(de) {
//逐个移动槽位链表上的entry
uint64_t h;
nextde = de->next;
/* Get the index in the new hash table */
//获取新 hashtable上的槽位 index
h = dictHashKey(d, de->key) & d->ht[1].sizemask;
//将de插入ht[1].table[h]的头部位置。 与添加不同 rehash要插入到槽位的头部。
de->next = d->ht[1].table[h];
d->ht[1].table[h] = de;
d->ht[0].used--;
d->ht[1].used++;
de = nextde;
}
//修改原来的槽位指向NULL
d->ht[0].table[d->rehashidx] = NULL;
//槽位索引加1,如果完成rehash,rehasdix=d.size-1
d->rehashidx++;
}
/* Check if we already rehashed the whole table... */
//检查是否已经完成rehash修改 标识
if (d->ht[0].used == 0) {
zfree(d->ht[0].table);
d->ht[0] = d->ht[1];
_dictReset(&d->ht[1]);
d->rehashidx = -1;
return 0;
}
/* More to rehash... */
return 1;
}
dictRehash 在添加和查询的时候都调用
调用链如下图
dict add的逻辑比较复杂 整理出下图
实现add的过程中 解决问题的思路是什么
整体思路
add的插入实例部分实现比较常规,没什么可说的,执行add dict空间可能不够,所以预先检查dict空间。如果不够则进行扩展。
由于一次执行完整个dict的rehash比较耗时,采用了rehash的方案,将rehash分散在dict添加,修改,删除中。
代码细节上的思路
第一次add 可以执行 初始化空间扩展
检查key是否存在的同时,检查空间是否足够,不够则扩展
扩展空间的方法处理了创建后初始化空间扩展,后续扩展并标识rehash进行中。
int dictReplace(dict *d, void *key, void *val)
{
dictEntry *entry, *existing, auxentry;
/* Try to add the element. If the key
* does not exists dictAdd will succeed. */
//如果不存在则写入,如果存在 使用existing获取 实例地址
entry = dictAddRaw(d,key,&existing);
if (entry) {
dictSetVal(d, entry, val);
return 1;
}
/* Set the new value and free the old one. Note that it is important
* to do that in this order, as the value may just be exactly the same
* as the previous one. In this context, think to reference counting,
* you want to increment (set), and then decrement (free), and not the
* reverse. */
//why 这么搞
auxentry = *existing;
dictSetVal(d, existing, val);
dictFreeVal(d, &auxentry);
return 0;
}
//找到下个节点
// index 是table[n] 中的索引,table[1]或table[2]称作一个槽位, iter->entry index槽位上的链表中某个节点
dictEntry *dictNext(dictIterator *iter)
{
//当iter->entry == NULL 时,第一次执行dictNext 或 index所属槽位上的节点已经轮询完毕,
//此时iter->index++; iter->entry=ht->table[iter->index] iter->entry 指向了 下个槽位的头部位置,也就是头部第一个接单的位置。
while (1) {
if (iter->entry == NULL) {
dictht *ht = &iter->d->ht[iter->table];
if (iter->index == -1 && iter->table == 0) {
//iter初始化后执行此块
if (iter->safe)
iter->d->iterators++;
else
iter->fingerprint = dictFingerprint(iter->d);
}
iter->index++;
if (iter->index >= (long) ht->size) {
//如果dict[0]已经迭代获取完毕, dict正在进行rehash,则使用ht[1]迭代获取
if (dictIsRehashing(iter->d) && iter->table == 0) {
iter->table++;
iter->index = 0;
ht = &iter->d->ht[1];
} else {
break;
}
}
//iter->entry 指向了 下个槽位的头部位置,也就是头部第一个接单的位置。
iter->entry = ht->table[iter->index];
} else {
//当前链表未耗尽,继续取出next
iter->entry = iter->nextEntry;
}
//如果iter->entry!=NULL,记录 next节点。
//如果iter->entry==NULL,执行下轮循环 查找 下个槽位的开始节点
if (iter->entry) {
/* We need to save the 'next' here, the iterator user
* may delete the entry we are returning. */
iter->nextEntry = iter->entry->next;
return iter->entry;
}
}
return NULL;
}