初识Redis

目录

各命令及底层实现

远程 redis 服务上执行命令:

字符串(String)

哈希(Hash)

列表(List)

集合(Set)

有序集合

发布订阅

redis和memcache的区别

穿透、缓存雪崩

使用redis实现分布式锁

并发竞争问题

持久化的几种方式、优缺点、怎么实现

缓存失败策略

redis集群高可用原理

redis缓存分片(分区)

redis的数据淘汰策略


各命令及底层实现

远程 redis 服务上执行命令:

redis-cli -h host -p port -a password

字符串(String)

字符串数据类型的相关命令用于管理 redis 字符串值

SET key value

设置指定 key 的值

GET key

获取指定 key 的值

GETRANGE kay start end

返回 key 中字符串值的子字符

GETSET key value

将给定 key 的值设为 value ,并返回 key 的旧值(old value)。

哈希(Hash)

Redis hash 是一个string类型的field和value的映射表,hash特别适合用于存储对象。

Redis 中每个 hash 可以存储 232 - 1 键值对(40多亿)。     

HSET key field value

将哈希表 key 中的字段 field 的值设为 value 。

HGET key field

获取存储在哈希表中指定字段的值。

HDELETE key field1[field2]

删除一个或多个哈希表字段

列表(List)

Redis列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)

一个列表最多可以包含 232 - 1 个元素 (4294967295, 每个列表超过40亿个元素)。

LPUSH key value1[value2]

将一个或多个值插入到列表头部

LPUSHX key value

将一个值插入到已存在的列表头部

LPOP key

移出并获取列表的第一个元素

LLEN key

获取列表长度

 

集合(Set)

Redis 的 Set 是 String 类型的无序集合。集合成员是唯一的,这就意味着集合中不能出现重复的数据。

Redis 中集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是 O(1)。

集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。

SADD key member1 [member2]

向集合添加一个或多个成员

SMEMBERS key

返回集合中的所有成员

SISMEMBER key member

判断 member 元素是否是集合 key 的成员

SPOP key

移除并返回集合中的一个随机元素

有序集合

Redis 有序集合和集合一样也是string类型元素的集合,且不允许重复的成员。

不同的是每个元素都会关联一个double类型的分数。redis正是通过分数来为集合中的成员进行从小到大的排序。

有序集合的成员是唯一的,但分数(score)却可以重复。

集合是通过哈希表实现的,所以添加,删除,查找的复杂度都是O(1)。 集合中最大的成员数为 232 - 1 (4294967295, 每个集合可存储40多亿个成员)。

ZADD key score1 member1 [score2 member2]

向有序集合添加一个或多个成员,或者更新已存在成员的分数

 

ZCARD key

获取有序集合的成员数

ZCOUNT key min max

计算在有序集合中指定区间分数的成员数

发布订阅

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

Redis 客户端可以订阅任意数量的频道。

频道Channel1,以及订阅这三个频道的客户端—— client2 、 client5 和 client1 之间的关系:

初识Redis

当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:

初识Redis

SUBSCRIBE channel [channel ...]

订阅给定的一个或多个频道的信息。

PSUBSCRIBE pattern [pattern ...]

订阅一个或多个符合给定模式的频道。

UNSUBSCRIBE [channel [channel ...]]

退订给定的频道。

PUNSUBSCRIBE [pattern [pattern ...]]

退订所有给定模式的频道。

PUBLISH channel message

将信息发送到指定的频道。

PUBSUB subcommand [argument [argument ...]]

查看订阅与发布系统状态。

redis和memcache的区别

memcache是分布式高速缓存系统。

共同点:都是内存数据库

区别:

memcache:

  1. 可以利用多核优势,单实例吞吐量极高,可以达到几十万QPS,适用于大数据量
  2. 只支持简单的key/value数据结构,redis支持丰富的数据类型
  3. 无法进行持久化,数据不能备份,只能用于缓存,重启后数据全部丢失

redis:

  1. 支持多种数据结构,如string、list、set、zset、dict等
  2. 单线程请求,所有命令串行执行,并发情况下不需要考虑数据一致性
  3. 支持持久化,可以是使用AOF及RDB数据持久化到磁盘,从而进行数据的备份或数据恢复等操作,防止数据的丢失。
  4. 支持通过Replication进行数据复制,通过master-slave机制,可以实现实时进行数据的同步复制,支持多级复制和增量复制。
  5. 支持pub/sub消息订阅机制,可以用来进行消息订阅与通知
  6. 支持简单的事务,使用场景很少,还不成熟

穿透、缓存雪崩

穿透

描述:

   指在redis中查询一个一定不存在的数据时,需要从数据库中查询。查询到数据不存在时则不写入缓存,这将导致每次在redis中获取不存在的数据时都会去数据库中查询,这将导致缓存穿透。

解决方法:

  1. 使用布隆过滤,将所有可能存在的数据哈希到一个足够大的bitmap中,一个一定不存在的数据就会被拦截,这样就可以避免对数据库的查询压力。
  2. 如果在数据库查询的数据为空,不管何种原因造成的,仍然将空结果进行缓存,只是设置的过期时间很短,最长不超过5分钟。

缓存雪崩

描述:

   Redis的缓存集中在一段时间内失效,发生大量的穿透,将所有的查询落到数据库中,造成了缓存雪崩。

解决方法:

   1、缓存失效后,通过加锁或队列来控制数据库写缓存的线程数量。如一个key只允许一个线程查询数据和写缓存,其他线程等待。

2、可以通过缓存reload机制,预先去更新缓存,再即将发生大并发访问前手动触发加载缓存。

3、不同的key设置不同的过期失效时间,让缓存失效时间点尽量均匀。

4、做好二级缓存或者双缓存策略。A1为原始缓存,A2为拷贝缓存,A1失效时,可以访问A2,A1缓存失效时间设置为短期,A2设置为长期。

 

使用redis实现分布式锁

分布式锁一般有三种实现方式:1. 数据库乐观锁;2. 基于Redis的分布式锁;3. 基于ZooKeeper的分布式锁。

1、SETNX是set if not exist的缩写,如果不存在就返回保存value并返回1,如果存在就返回0。将key设置值为value,如果key不存在,这种情况下等同SET命令。 当key存在时,什么也不做。

2、GETSET其实就是两个指令GET和SET,首先会GET到当前key的值并返回,然后在设置当前Key为要设置Value。

public class RedisLock {
   
private static final String LOCK_SUCCESS = "OK";
   
private static final String SET_IF_NOT_EXIST = "NX";
   
private static final String SET_WITH_EXPIRE_TIME = "PX";
   
private final JedisPool jedisPool;

   
public RedisLock(JedisPool jedisPool) {
       
this.jedisPool = jedisPool;
    }

   
/**
     *
尝试获取分布式锁
     * @param
jedis Redis客户端
     * @param
lockKey
     * @param
requestId 请求标识
     * @param
expireTime 超期时间
     * @return 是否获取成功
     */
   
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId,
SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

       
if (LOCK_SUCCESS.equals(result)) {
           
return true;
        }
       
return false;

    }

   
/**
     *
加锁
     * @param
locaName  锁的key
     * @param
acquireTimeout  获取超时时间
     * @param
timeout   锁的超时时间
     * @return 锁标识
     */
   
public String lockWithTimeout(String locaName,
                                 
long acquireTimeout, long timeout) {
        Jedis conn =
null;
        String retIdentifier =
null;
       
try {
           
// 获取连接
           
conn = jedisPool.getResource();
           
// 随机生成一个value
           
String identifier = UUID.randomUUID().toString();
           
// 锁名,即key值
           
String lockKey = "lock:" + locaName;
           
// 超时时间,上锁后超过此时间则自动释放锁
           
int lockExpire = (int)(timeout / 1000);

           
// 获取锁的超时时间,超过这个时间则放弃获取锁
           
long end = System.currentTimeMillis() + acquireTimeout;
           
while (System.currentTimeMillis() < end) {
               
if (conn.setnx(lockKey, identifier) == 1) {
                    conn.expire(lockKey, lockExpire);
                   
// 返回value值,用于释放锁时间确认
                   
retIdentifier = identifier;
                   
return retIdentifier;
                }
                
// 返回-1代表key没有设置超时时间,为key设置一个超时时间
               
if (conn.ttl(lockKey) == -1) {
                    conn.expire(lockKey, lockExpire);
                }

               
try {
                    Thread.sleep(
10);
                }
catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }
catch (JedisException e) {
            e.printStackTrace();
        }
finally {
           
if (conn != null) {
                conn.close();
            }
        }
       
return retIdentifier;
    }

   
/**
     *
释放锁
     * @param
lockName 锁的key
     * @param
identifier    释放锁的标识
     * @return
    
*/
   
public boolean releaseLock(String lockName, String identifier) {
        Jedis conn =
null;
        String lockKey =
"lock:" + lockName;
       
boolean retFlag = false;
       
try {
            conn =
jedisPool.getResource();
           
while (true) {
               
// 监视lock,准备开始事务
               
conn.watch(lockKey);
               
// 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
               
if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    List<Object> results = transaction.exec();
                   
if (results == null) {
                        
continue;
                    }
                    retFlag =
true;
                }
                conn.unwatch();
               
break;
            }
        }
catch (JedisException e) {
            e.printStackTrace();
        }
finally {
           
if (conn != null) {
                conn.close();
            }
        }
       
return retFlag;
    }
}

 

并发竞争问题

Redis的并发竞争问题,主要是发生在并发写竞争。

  1. 可以使用独占锁的方式,类似操作系统的mutex机制。
  2. 使用乐观锁的方式进行解决(成本较低,非阻塞,性能较高)。

watch stock

get stock $stock

$stock = $stock - 10

multi

set stock $stock

exec

watch这里表示监控该key值,后面的事务是有条件的执行,如果从watch的exec语句执行时,watch的key对应的value值被修改了,则事务不会执行。

WATCH key [key ...]

监视一个(或多个) key ,如果在事务执行之前这个(或这些) key 被其他命令所改动,那么事务将被打断。

UNWATCH

取消 WATCH 命令对所有 key 的监视。

MULTI

标记一个事务块的开始。

EXEC

执行所有事务块内的命令。

DISCARD

取消事务,放弃执行事务块内的所有命令。

 

持久化的几种方式、优缺点、怎么实现

  1. RDB持久化:将Reids在内存中的数据库记录定时dump到磁盘上的RDB持久化。

配置:

通过配置文件来修改Redis服务器dump快照的频率,在打开6379.conf文件之后,搜索save

save 900 1  #在900秒(15分钟)之后,如果至少有1个key发生变化,则dump内存快照。

save 300 10 #在300秒(5分钟)之后,如果至少有10个key发生变化,则dump内存快照。

save 60 10000 #在60秒(1分钟)之后,如果至少有10000个key发生变化,则dump内存快照。

 

  1. AOF(append only file)持久化:原理是将Reids的操作日志以追加的方式写入文件。

配置:

在Redis的配置文件中存在三种同步方式,它们分别是:

appendfsync always  #每次有数据修改发生时都会写入AOF文件。

appendfsync everysec  #每秒钟同步一次,该策略为AOF的缺省策略。

appendfsync no     #从不同步。高效但是数据不会被持久化。

 

缓存失败策略

穿透、缓存雪崩

redis集群高可用原理

两种方式:

  1. 主备方式

一台主机、一台或多台备机,在正常情况下主机对外提供服务,并把数据同步到备机,当主机宕机后,备机立刻开始服务。

优点:对客户端无影响

缺点:绝大数情况下备机没有使用。一直空闲浪费。

  1. 主从方式

主从之间进行数据同步。 当Master宕机后,通过选举算法(Paxos、Raft)从slave中选举出新Master继续对外提供服务,主机恢复后以slave的身份重新加入。

优点:另一个目的是进行读写分离,这是当单机读写压力过高的一种通用型解决方案。 其主机的角色只提供写操作或少量的读,把多余读请求通过负载均衡算法分流到单个或多个slave服务器上。

缺点:主机宕机后,Slave虽然被选举成新Master了,但对外提供的IP服务地址却发生变化了,意味着会影响到客户端。 解决这种情况需要一些额外的工作,在当主机地址发生变化后及时通知到客户端,客户端收到新地址后,使用新地址继续发送新请求

数据同步方式:

1、异步

2、同步

redis缓存分片(分区)

分区是分割数据到多个Redis实例的处理过程,因此每个实例只保存key的一个子集。

优点

缺点

  1. 利用多台计算机内存的和值,允许我们构造更大的数据库。
  2. 通过多核和多台计算机,允许我们扩展计算能力;通过多台计算机和网络适配器,允许我们扩展网络带宽。
  1. 涉及多个key的操作通常是不被支持的。举例来说,当两个set映射到不同的redis实例上时,你就不能对这两个set执行交集操作。
  2. 涉及多个key的redis事务不能使用。
  3. 当使用分区时,数据处理较为复杂,比如你需要处理多个rdb/aof文件,并且从多个实例和主机备份持久化文件。
  4. 增加或删除容量也比较复杂。redis集群大多数支持在运行时增加、删除节点的透明数据平衡的能力,但是类似于客户端分区、代理等其他系统则不支持这项特性。然而,一种叫做presharding的技术对此是有帮助的。

分区类型:

  1. 范围分区

最简单的分区方式是按范围分区,映射一定范围的对象到特定的Redis实例

比如,ID从0到10000的用户会保存到实例R0,ID从10001到 20000的用户会保存到R1,以此类推。

这种方式是可行的,并且在实际中使用,不足就是要有一个区间范围到实例的映射表。这个表要被管理,同时还需要各 种对象的映射表,通常对Redis来说并非是好的方法。

  1. 哈希分区

用一个hash函数将key转换为一个数字,比如使用crc32 hash函数。对key foobar执行crc32(foobar)会输出类似93024922的整数。

对这个整数取模,将其转化为0-3之间的数字,就可以将这个整数映射到4个Redis实例中的一个了。93024922 % 4 = 2,就是说key foobar应该被存到R2实例中。注意:取模操作是取除的余数,通常在多种编程语言中用%操作符实现。

redis的数据淘汰策略

redis内存的数据集大小到一定大小时,会实施数据淘汰策略。Redis有6种数据淘汰策略:

  1. volatile-lru:从已设置过期时间的数据集(server.db[i].expires)中挑选最近最少使用的数据淘汰。
  2. volatile-ttl:从已设置过期时间的数据集(server.db[i].expires)中挑选将要过期的数据淘汰
  3. volatile-random:从已设置过期时间的数据集(server.db[i].expires)中任意选择数据淘汰。
  4. allkeys-lru:从数据集(server.db[i].dict)中挑选最近最少使用的数据淘汰。
  5. allkeys-random:从数据集(server.db[i].dict)中任意选择数据淘汰。
  6. no-enviction(驱逐):禁止驱逐数据。