秒杀业务:使用redis处理分布式锁的问题
分布式锁会在高并发的业务被使用到:
一、 分布式锁的处理一般可以有两种处理方式:
1.利用zookepeer的数据结构以及特性来处理分布式锁。
zookeeper可以创建临时有序的数据节点,同时每个数据节点可以对比其小的数据节点进行监控。只有当比自己点 排序更小的数据节点被删除之后才会,zk才会处理下一个节点。利用此特性,将高并发的访问写入到zk上的不同的有序 的节点。这时候各个请求就会有序的执行,只有在***比自己小的请求执行完毕之后才会执行,可以实现一个分锁。
2.利用redis的可以对每一个key可以执行一个设置锁的操作,以及该锁的过期时间可以实现。但是在实现过程中会出现一些问题,今天我就整理自己的一些见解以及使用代码来完成使用redis处理分布式锁。
二、使用reids处理分布式锁的实战
1.原生api处理分布式锁(效果不好,适用于并发访问低的网站,这里不再演示)
关键字:setNx expire
2.使用redisTemplate模板操作(效果较好,适用访问量较大的公司)
<1>.第一步:pom文件中引入相关依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency>
<2>.第二步:applicaiton.properties中配置redis的相关链接配置
redis.hostName=127.0.0.1 redis.port=6379 redis.password=https://blog.hhui.top # 连接超时时间 redis.timeout=10000 #最大空闲数 redis.maxIdle=300 #控制一个pool可分配多少个jedis实例,用来替换上面的redis.maxActive,如果是jedis 2.4以后用该属性 redis.maxTotal=1000 #最大建立连接等待时间。如果超过此时间将接到异常。设为-1表示无限制。 redis.maxWaitMillis=1000 #连接的最小空闲时间 默认1800000毫秒(30分钟) redis.minEvictableIdleTimeMillis=300000 #每次释放连接的最大数目,默认3 redis.numTestsPerEvictionRun=1024 #逐出扫描的时间间隔(毫秒) 如果为负数,则不运行逐出线程, 默认-1 redis.timeBetweenEvictionRunsMillis=30000 #是否在从池中取出连接前进行检验,如果检验失败,则从池中去除连接并尝试取出另一个 redis.testOnBorrow=true #在空闲时检查有效性, 默认false redis.testWhileIdle=true
<3>.第三步:分布式锁代码实现
package com.example.demo.redis; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * 商品操作类 */ @RestController @RequestMapping("/redis") public class ProductController { @Autowired private StringRedisTemplate redisTemplate; @RequestMapping("/product_count") public String getProduct(){ /** * 定义秒杀商品的redis中的key 以及 一个UUID 作为锁的value */ String productName = "productId"; String locKeyName = "lockKey"; String value = UUID.randomUUID().toString(); try{ /** * 设置锁,以及锁的过期时间,最好使用四个参数的setifAbsent保证原子性 */ Boolean aBoolean = redisTemplate.opsForValue().setIfAbsent(locKeyName,value,30, TimeUnit.SECONDS); if(!aBoolean){ return "此时锁被别人占领"; } /** * 减库存操作 */ int product_count = Integer.valueOf(redisTemplate.opsForValue().get(productName)); if(product_count > 0){ int count = product_count - 1; redisTemplate.opsForValue().set(productName,String.valueOf(count)); System.out.println("扣除库存成功!当前库存为" + count); }else{ System.out.println("商品已售空!"); } }finally { /** * 操作完毕,释放当前线程的锁 */ String concurrentLock = redisTemplate.opsForValue().get(locKeyName); if(value.equals(concurrentLock)){ redisTemplate.delete("product_001"); } } return "mission completed"; } }
总结:使用 redisTemplate做分布式锁的注意事项。
注意事项:
1.在使用redisTemplate的setIfAbsent API 的时候需要注意的是一定要使用key、value、long、timeUnit这参数 都有的方法。因为这样才能够保证get锁和设置锁的过期时间保持原子性。否则在这两个操作之间可能会出现错误。
2.在使用redisTemplate做分布式锁的时候需要使用try +finally 来释放锁,因为在程序执行的过程中可能会出现一 些状况,所以必须得保证每一个线程执行完毕之后释放锁。
3.在释放锁的时候必须加上判断保证释放的是自己的锁,否则有可能会释放到后面线程的锁导致锁的永久失效。
举例说明:
4.任务执行的时间问题还是无法解决,需要在主线程中new一个新的线程来增加lockKey的存活时间。知道主线程完 成时才将此线程关闭,才能够保证锁被别的线程获取同时也能够保证自己的任务执行完成。
有兴趣的小伙伴可以知己动手尝试着手写一下。
三、使用reids + Redisson处理分布式锁的实战
1.加入依赖:
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.7.4</version> </dependency>
2.将redisson注入spring容器:
@Bean // 创建好的redisson必须加上@Bean才能注入到spring容器中
public Redisson redisson(){
/**
* 使用redisson的config创建一个redisson的客户端并连接
* 并使用@Bean注解将其初始化到spring容器
*/
Config config = new Config();
config.useSingleServer().setAddress("127.0.0.1:6379").setDatabase(0);
/**
* 注意类型转换
*/
return (Redisson) Redisson.create(config);
}
注意:redisson的创建方式是使用redisson的config 来设置地址和数据库。最后使用create()有参构造创建。
3.代码实战:
package com.example.demo.redis; import org.redisson.Redisson; import org.redisson.api.RLock; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.util.concurrent.TimeUnit; @RestController @RequestMapping("/redisson") public class RedissonController { @Autowired private Redisson redisson; @Autowired private StringRedisTemplate redisTemplate; @RequestMapping("/product") public String getProduct(){ String productName = "productId"; String locKey = "lockKey"; /** * 使用redisson获取锁并判断其是否获取锁 */ RLock lock = redisson.getLock(locKey); boolean locked = lock.isLocked(); if(locked){ return "此时锁被别的线程使用!"; } /** * 设置锁的过期时间: 锁的默认过期时间为30s * * 注:redission的lock方法的底层是一个rentreelock,同时它的底层也自动加上了一个线程 * 作为定时器来增加锁的过期时间,知道业务执行完毕之后才会关闭调该线程。保证在业务执行完成再释放锁! * */ lock.lock(30, TimeUnit.SECONDS); // 解决了延长锁的持有之间的问题 /** * 业务处理 */ Integer count = Integer.valueOf(redisTemplate.opsForValue().get(productName)); if(count > 0){ int x = count - 1; redisTemplate.opsForValue().set(productName,String.valueOf(x)); System.out.println("操作成功,库存数量减一"); }else{ System.out.println("库存不足,请稍后操作!"); } lock.unlock(); // 注意:lock必须手动释放!!! return "mission completed"; } }
注意:Lock lock = redision.getLock("name"); 获取到的是一个可重入锁:renetreeLock
lock.lock(),方法中已经实现了加入定时增加的key的存活时间的线程,能够保证资源不被浪费。保证在当前线程执行完毕之后才释放锁。
rentreelock必须释放锁!!!
这就已经用redis实现了分布式锁!