一致性性哈希算法简单的模拟实现
源代码
#!/usr/bin/python
#-*- coding: utf-8 -*-
import platform
import os
import json
import collections
import math
import re
import string
import types
import random
divisor = 2**10
'''
'''
def ip_hash_func(ip):
#print 'ip_hash_func:input argument is ==>'+ip
if(re.match(r'[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}',\
ip, re.I) ==None or\
type(ip) is not types.StringType):
raise Exception("ip format error")
return
result = ip.split('.');
var_hex = '0x'
for i in range(len(result)):
var = result[i]
var_hex = var_hex + hex(int(var))[2:]
var_long = long(var_hex, 16);
return var_long%divisor
def cluster_hash_func(vnode_key):
default_vr_separator = ':'
try:
key_list = vnode_key.split(default_vr_separator)
rnode_hash = ip_hash_func(key_list[0])
vnode_hash = ip_hash_func(key_list[1])
return (rnode_hash+vnode_hash)%divisor
except Exception, arg:
print 'hash error:\n',arg
else:
pass
#print 'execute else in try-except-else-finally'
finally:
pass
#print 'execute finally in try-except-else-finally'
def resource_hash_func(key):
var_list = map(lambda x:ord(x), list(key))
var_hex = '0x'
for i in range(len(var_list)):
var = var_list[i]
var_hex = var_hex + hex(var)[2:]
var_long = long(var_hex, 16)
return var_long%divisor
'''
'''
def generate_virtual_node(node_ip, avg_virtual_node_count):
suffix_len =int(math.ceil(len(bin(avg_virtual_node_count)[2:])/8.0))
ip_word_list = node_ip.split('.')
prefix_len = len(ip_word_list)-suffix_len
prefix_list = ip_word_list[0:prefix_len]
real_node_suffix_list = ip_word_list[prefix_len:]
virtual_node_ip_list=[]
while(len(virtual_node_ip_list)<avg_virtual_node_count):
virtual_node_suffix_list=[]
while(len(virtual_node_suffix_list)<len(ip_word_list)-prefix_len):
rand = str(int(round(random.uniform(0, 255))))
length = len(virtual_node_suffix_list)
if(rand not in\
real_node_suffix_list[\
length-1 if length>0 else length:\
length if length>0 else length+1]):
virtual_node_suffix_list.append(rand)
ip = '.'.join(prefix_list)+'.'+'.'.join(virtual_node_suffix_list)
if(ip not in virtual_node_ip_list):
virtual_node_ip_list.append(ip)
return virtual_node_ip_list
def generate_real_node(count):
ip_list = []
default_ip_word_count = 4
while(len(ip_list)<count):
ip_word_list = []
while(len(ip_word_list)<default_ip_word_count):
rand = str(int(round(random.uniform(0, 255))))
ip_word_list.append(rand)
ip = '.'.join(ip_word_list)
if(ip not in ip_list):
ip_list.append(ip)
return ip_list
def generate_vnode_key(rnode_ip, vnode_ip_list):
return map(lambda vnode_ip:rnode_ip+':'+vnode_ip, vnode_ip_list)
def generate_resource(count):
resource_list=[]
seed = "0123456789abcdefghijklmnopqrstuvwxyz"+\
"ABCDEFGHIJKLMNOPQRSTUVWXYZ_+-#$%@*"
while(len(resource_list)<count):
ch_list = []
key_len =int(math.ceil(random.uniform(4, 10)))
for i in range(key_len):
ch_list.append(random.choice(seed))
key = ''.join(ch_list)
if(key not in resource_list):
resource_list.append(key)
return resource_list
class ConsistentHash:
default_avg_vnode_count = 1
'''
vnode_sum = avg_vnode*rnode
'''
default_rnode_count = 10
default_resource_count = 20
default_node_capacity = 3
#hash_cycle = []*2**32
#cluster_cycle
#resource_cycle
#result_cycle
#rnode_list
#vnode_key_list
#resource_list
def __init__(self):
#self.cluster_cycle = [[]]*(divisor+1)
#self.resource_cycle = [[]]*(divisor+1)
#self.result_cycle = [[]]*(divisor+1)
self.cluster_cycle = self.init_cycle(divisor+1)
self.resource_cycle = self.init_cycle(divisor+1)
self.result_cycle = self.init_cycle(divisor+1)
self.vnode_key_list = []
self.rnode_list = generate_real_node(self.default_rnode_count)
self.resource_list = generate_resource(\
self.default_resource_count)
self.__generate_batch_vnode_key()
def init_cycle(self, size):
cycle = []
for i in xrange(size):
cycle.append([])
return cycle
def set_config(self, rnode_count,\
resource_count, avg_vnode_count,\
capacity):
self.default_rnode_count = rnode_count
self.default_resource_count = resource_count
self.default_avg_vnode_count = avg_vnode_count
self.default_node_capacity = capacity
#self.cluster_cycle = [[]]*(divisor+1)
#self.resource_cycle = [[]]*(divisor+1)
#self.result_cycle = [[]]*(divisor+1)
self.cluster_cycle = self.init_cycle(divisor+1)
self.resource_cycle = self.init_cycle(divisor+1)
self.result_cycle = self.init_cycle(divisor+1)
self.rnode_list = generate_real_node(self.default_rnode_count)
self.resource_list = generate_resource(\
self.default_resource_count)
self.__generate_batch_vnode_key()
def __generate_batch_vnode_key(self):
length = len(self.rnode_list)
for i in range(length):
rnode = self.rnode_list[i]
vnode_list = generate_virtual_node(rnode,\
self.default_avg_vnode_count)
key_list = generate_vnode_key(rnode, vnode_list)
self.vnode_key_list.extend(key_list)
def map_resource(self):
length = len(self.resource_list)
for i in range(length):
resource = self.resource_list[i]
hash_value = resource_hash_func(resource)
#if not self.resource_cycle.has_key(hash_value):
# self.resource_cycle[hash_value] = []
self.resource_cycle[hash_value].append(resource)
def map_cluster(self):
#length = len(self.vnode_key_list)
for key in self.vnode_key_list:
hash_value = cluster_hash_func(key)
#if not self.cluster_cycle.has_key(hash_value):
# self.cluster_cycle[hash_value] = []
entity = {key:[]}
self.cluster_cycle[hash_value].append(entity)
def cache_resource(self):
length = len(self.resource_cycle)
for hash_value in range(length):
#for hash_value in self.resource_cycle:
resource_list = self.resource_cycle[hash_value]
if(len(resource_list)<=0):
continue
cluster_cyc_size = len(self.cluster_cycle)
node_list = self.cluster_cycle[hash_value]
if(len(node_list)<=0):
next_hash_value = (hash_value+1)%cluster_cyc_size
while True:
if(next_hash_value == hash_value):
return "error"
node_list = self.cluster_cycle[next_hash_value]
if(len(node_list)<=0):
next_hash_value = (next_hash_value+1)%cluster_cyc_size
else:
break
for i in range(len(node_list)):
get_next_node = False
dic = node_list[i]
keys = dic.keys()
for key in keys:
if re.match(r'([0-9]{1,3}\.){3,3}[0-9]{1,3}:'+\
r'([0-9]{1,3}\.){3,3}[0-9]{1,3}', key)==\
None:
continue
if(len(dic[key])>=self.default_node_capacity):
if(i+1<len(node_list)):
get_next_node = True
else:
for resource in resource_list:
if(resource not in dic[key]):
dic[key].append(resource)
else:
for j in range(len(resource_list)):
resource = resource_list[j]
if(resource not in dic[key]):
if(len(dic[key])<self.default_node_capacity\
or i+1>=len(node_list)):
dic[key].append(resource)
else:
resource_list = resource_list[j:]
get_next_node = True
break
break
if not get_next_node:
break
def print_result(self):
sysstr = platform.system()
if(sysstr == 'Windows'):
os.system('cls')
elif(sysstr == 'Linux'):
os.system('clear')
else:
os.system('clear')
cache_result=''
distribution_result=''
distribution = {}
#distribution = collections.OrderedDict()
for i in xrange(len(self.cluster_cycle)):
hash_entity = self.cluster_cycle[i]
if(len(hash_entity)<=0):
continue
cr = str(i)+': '
comment ='node keys:'
for dic in hash_entity:
for key in dic:
cr=cr+key+': '+str(dic[key])+';'
comment =comment + str(dic.keys())
cr = cr+',\n'
cache_result = cache_result+cr
distribution[i] = comment+'; '
for i in xrange(len(self.resource_cycle)):
resource_list = self.resource_cycle[i]
if(len(resource_list)<=0):
continue
comment = 'resource names:'+str(resource_list)
if not distribution.has_key(i):
distribution[i] = ''
distribution[i] = distribution[i]+comment
ordered_list = sorted(distribution.items(), key=lambda tupl:tupl[0])
distribution = collections.OrderedDict()
for tupl in ordered_list:
distribution[tupl[0]]=tupl[1]
distribution_result = json.dumps(distribution, indent=4)
'''
distribution_result = json.dumps(sorted(distribution.items(),\
key=lambda tupl:tupl[0]),\
indent=4)
'''
print 'hash distribution in ring:\n'+distribution_result
print '---------------------------------------------------'
print 'cache result is:\n'+cache_result
def update_cache(self):
#self.cluster_cycle = [[]]*(divisor+1)
#self.resource_cycle = [[]]*(divisor+1)
#self.result_cycle = [[]]*(divisor+1)
self.cluster_cycle = self.init_cycle(divisor+1)
self.resource_cycle = self.init_cycle(divisor+1)
self.result_cycle = self.init_cycle(divisor+1)
self.map_resource()
self.map_cluster()
self.cache_resource()
def get_default_emulation_result(self):
self.map_resource()
self.map_cluster()
self.cache_resource()
self.print_result()
def reprint_cache(self):
for node_list in self.cluster_cycle:
if len(node_list)<=0:
continue
for dic in node_list:
for key in dic:
dic[key] = []
self.cache_resource()
self.print_result()
def reemulate(self):
self.__init__()
self.get_default_emulation_result()
def __node_cache_strategy(self, node_chain, resource_chain):
pass
def main(argv):
pass
使用链地址法预防冲突
导入Python模块 :import consistent_hash
创建一个实例:ch= consistent_hash.ConsistentHash()
执行函数:ch .reemulate()
得到结果
进行哈希映射后资源及主机节点在环上的分布情况
最后主机上资源的缓存情况
删除一个hash值为919的主机节点
重新查看结果:输入ch.reprint_cache()