Redsi自定义客户端分片实现
今天设计一个基于客户端的分片缓存方案...比如如果业务量不是很大的话,这种情况下是可以自己实现分片逻辑的,这样出了问题,也比较好定位嘛,接下来我们就分析并实现一种这样的方案:
主要包括缓存分片和读写分离以及失效转移,三个功能
缓存分片:
& 随机分片
& HASH一致性分片
读写分离:
& 随机分片
& 轮询分片
首先看一下项目整体结构:
代码整体不复杂也不多:看看核心接口
& 分片接口
package com.redis.shard.strategy; /** * @author 18011618 * 基于分片策略接口 * 默认实现机制基于关键字的hash取模 */ public interface ShardingStrategy { /** * 基于hash算法 * @param key 输入的字符串 * @param nodeCount 节点数量 * @return */ public <T> int shard(T key, int nodeCount); }
& 读写分离接口
package com.redis.shard.strategy; /** * @author 18011618 * 用来提供读写分离策略的接口 * 基于轮询 * 基于随机 */ public interface SelectShardStrategy { /** * 读写分离机制 * @param nodeCount * @return */ public int selectShard(int nodeCount); }
看看具体实现:
package com.redis.shard.strategy.impl; import com.redis.shard.strategy.ShardingStrategy; /** * @author 18011618 * 实现具体的hash分片 */ public class HashShardingStrategy implements ShardingStrategy { @Override public <T> int shard(T key, int nodeCount) { return key.hashCode() % nodeCount; } }
随机选择:
package com.redis.shard.strategy.impl; import java.util.Date; import java.util.Random; import com.redis.shard.strategy.SelectShardStrategy; /** * @author 18011618 * 基于随机实现策略 */ public class RandomSelectStrategy implements SelectShardStrategy{ private Random random = new Random(new Date().getTime()); @Override public int selectShard(int nodeCount) { return random.nextInt(nodeCount); } }
轮询策略:
package com.redis.shard.strategy.impl; import java.util.concurrent.atomic.AtomicLong; import com.redis.shard.strategy.SelectShardStrategy; /** * 基于轮询的策略 * @author 18011618 */ public class RoundRobinSelectStrategy implements SelectShardStrategy { private AtomicLong atomic = new AtomicLong(0); @Override public int selectShard(int nodeCount) { long value = atomic.incrementAndGet(); if(value == Long.MAX_VALUE){ atomic.set(0); } return (int)value % nodeCount; } }
接下来看看分片节点信息的封装:
package com.redis.shard; import java.util.ArrayList; import java.util.List; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; import redis.clients.jedis.JedisPool; import com.redis.shard.strategy.SelectShardStrategy; import com.redis.shard.strategy.impl.RandomSelectStrategy; import com.redis.shard.strategy.impl.RoundRobinSelectStrategy; /** * @author 18011618 * 封装客户端缓存分片的节点信息 */ public class RedisShardNode { //191.168.1.1:6350,191.168.1.2:6350 public static final String NODE_SEPARATOR = ","; //191.168.1.1:6350 public static final String HOST_PORT_SEPARATOR = ":"; //主节点 private JedisPool master; //从节点 private List<JedisPool> slaves; //读写分离机制 如果开启了读写分离会启用该策略 public SelectShardStrategy selectStrategy; //指定默认读写策略为轮询 public RedisShardNode( JedisPool master, List<JedisPool> slaves){ this.master = master; this.slaves = slaves; this.selectStrategy = new RandomSelectStrategy(); } //根据用户传递的读写策略 public RedisShardNode(JedisPool master,List<JedisPool>slaves,SelectShardStrategy selectShardStrategy){ this.master = master; this.slaves = slaves; this.selectStrategy = selectShardStrategy; } //通过参数实例化+指定默认策略 public RedisShardNode(String masterConnStr, List<String> slavesConnStrs){ String[] masterHostPortArray = masterConnStr.split(HOST_PORT_SEPARATOR); //设置master this.master = new JedisPool(new GenericObjectPoolConfig(), masterHostPortArray[0], Integer.valueOf(masterHostPortArray[1])); //设置slaves this.slaves = new ArrayList<JedisPool>(); for (String slaveConnStr : slavesConnStrs) { String[] slaveHostPortArray = slaveConnStr .split(HOST_PORT_SEPARATOR); this.slaves.add(new JedisPool(new GenericObjectPoolConfig(), slaveHostPortArray[0], Integer .valueOf(slaveHostPortArray[1]))); } this.selectStrategy = new RoundRobinSelectStrategy(); } //实例化参数+传递策略 public RedisShardNode(String masterConnStr, List<String> slavesConnStrs,SelectShardStrategy selectShardStrategy){ this(masterConnStr,slavesConnStrs); this.selectStrategy = selectShardStrategy; } public JedisPool getMaster() { return master; } public void setMaster(JedisPool master) { this.master = master; } public List<JedisPool> getSlaves() { return slaves; } public void setSlaves(List<JedisPool> slaves) { this.slaves = slaves; } //获取一个从节点 public JedisPool getSlaveRedisShardNode() { int nodeIndex = selectStrategy.selectShard(slaves.size()); return slaves.get(nodeIndex); } }
最后看看客户端分片实现机制:因为要基于JEDISs所以需要基础Jedis
package com.redis.shard; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import redis.clients.jedis.Jedis; import com.redis.shard.strategy.ShardingStrategy; import com.redis.shard.strategy.impl.HashShardingStrategy; /** * @author 18011618 * 封装对外的redis分片客户端使用方法 */ public class RedisShardClient extends Jedis{ protected final Logger log = LoggerFactory.getLogger(RedisShardClient.class); //存储分片节点 private List<RedisShardNode> redisShardNodes = new ArrayList<RedisShardNode>(); //分片策略 private ShardingStrategy shardingStategy = new HashShardingStrategy(); //是否开启读写 private boolean readWriteSeparate = false; //连接字符串 private List<String> nodeConnStrs; //一定要有这个函数 否则反射的时候会报错 public RedisShardClient(){ } //传递集群连接字符串 public RedisShardClient(List<String> nodeConnStrs){ if (StringUtils.isEmpty(nodeConnStrs)) { log.error("The nodeConnStrs {} for Redic is invalid.", nodeConnStrs); throw new IllegalArgumentException( "The nodeConnStrs for Redic is invalid."); } this.nodeConnStrs = nodeConnStrs; //调用初始化方法 init(); } /** * 初始化分片节点 */ public void init(){ for(String nodeStr:nodeConnStrs){ this.addRedisNode(nodeStr); } } private RedisShardClient addRedisNode(String masterConnStr, List<String> slaveConnStrs){ redisShardNodes.add(new RedisShardNode(masterConnStr, slaveConnStrs)); return this; } public RedisShardClient addRedisNode(String nodeConnStr) { String[] nodes = nodeConnStr.split(RedisShardNode.NODE_SEPARATOR); return addRedisNode(nodes[0], Arrays.asList(Arrays.copyOf(nodes, 1))); } /** * read方法 往主节点 */ public <T> Jedis getRead(T key){ int nodeIndex = shardingStategy.shard(key, redisShardNodes.size()); RedisShardNode node = redisShardNodes.get(nodeIndex); //如果开启了读写分离 if(!readWriteSeparate) return node.getMaster().getResource();//从主节点 //随机选择一个从节点进行读取 return node.getSlaveRedisShardNode().getResource(); } /** * write方法 */ public <T> Jedis getWrite(T key){ int nodeIndex = shardingStategy.shard(key, redisShardNodes.size()); RedisShardNode node = redisShardNodes.get(nodeIndex); return node.getMaster().getResource(); } /** * 重写get方法 */ @Override public String get(String key) { Jedis jedis = getRead(key); String result = jedis.get(key); jedis.close(); return result; } /** * 重写set方法 */ @Override public String set(String key, String value) { Jedis jedis =getWrite(key); String result = jedis.set(key, value); jedis.close(); return result; } }
最后看一下配置文件:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean id="redisShardClient" class="com.redis.shard.RedisShardClient" init-method="init"> <!--配置读写分离开关 --> <property name="readWriteSeparate" value="${redis.readWrite}"/> <property name="nodeConnStrs"> <!-- 配置多个节点 --> <list> <value>${reids.node1}</value> <!-- <value>${reids.node2}</value> --> </list> </property> </bean> </beans>
OK,再写一个测试类:
import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import com.redis.shard.RedisShardClient; @SuppressWarnings("all") public class RediShardTest { public static void main(String[] args) { String path ="/spring/spring_redis_shard_client_test.xml"; ApplicationContext context = new ClassPathXmlApplicationContext(); RedisShardClient redisShardClient = (RedisShardClient)context.getBean("redisShardClient"); redisShardClient.set("uname", "jhp"); String uname = redisShardClient.get("uname"); System.out.println("uname:"+uname); } }
对应的配置文件:
具体配置文件内容:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd"> <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"> <property name="locations" value="classpath:spring/redic.properties"/> </bean> <import resource="classpath:spring/spring_redis_client_shard.xml"/> </beans>
以及属性配置文件:
reids.node1=localhost:6379,localhost:6379 redis.readWrite=true
到此一个简单的基于客户端分片功能就实现了,大家可以增加更加复杂的策略完成更多的功能,比如HASH一致性,这个在dubbo的
cluster模块中loadbalance是有实现,可以去参考