基于redis实现分布式锁案例

基于redis实现分布式锁案例

这是我的案例流程图。浏览器----->zuul--->feignclient(两个)---->service(两个实例)。

由于多个服务实例。所以在并发情况下。传统的线程并发解决方案不能实现。原因:传统synychnozed锁或者lock都是基于同一个

jvm内部的。因为共享变量存在方法区(线程共享区)。所以多线程间是共享该变量实现线程上下文的安全切换。达到线程安全。

但是在分布式情况下。多实例。那么就是多jvm.多进程之间。进程是具有隔离性的。变量互相不见不可共享。那么这个时候redis

提供了解决方案。redis是单线程的。同一时刻只能有一个线程运行。那么可以利用这个特性。用到redis的几个命令:

conn.setnx(lockKey, identifier) == 1.这个nx的意思就是key存在的情况下不执行。key不存在在执行。
conn.expire(lockKey, lockExpire);设置key的超时时间。超过时间。自动释放掉锁
jedis.decr(key);redis自减

大概的原理是这样:  多进程并发执行。同一个时刻只有一个进程。首先去获取锁。如果key已经存在。说明当前有客户端正在使用该锁。那么就等待。设置超过多长时间获取不到锁就不获取了。如果key并存在则获取到锁。并且为锁设置超时时间。时间到自动释放该锁(避免产生死锁)。执行业务逻辑。执行完手动释放锁。等待其他进程拿到锁。那么我们要用redis来存储数据。保证每个进程都是从redis中拿到同一份数据。大概的原理就是这样。细节还有很多隐藏的bug.以后慢慢深入研究就补上。我的代码如下:

基本实现分布式锁功能。但是有个问题就是redis链接对象无法释放。找了半天找不到问题。明天继续找问题。

zuul:
  routes:
    microservice-consumer-movie-feign:
      path: /api/feign/**
      stripPrefix: false


#路由配置
package com.itmuch.cloud.microserviceconsumermoviefeign.FeginClient;


import com.itmuch.cloud.microserviceconsumermoviefeign.entity.User;
import org.springframework.cloud.netflix.feign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestParam;

/**
 * : 描述信息
 *
 * @author liyy
 * @date 2018-07-28 14:59
 */
@FeignClient(value = "microservice-provider-user",path = "/userController"/*,fallback = UserFeignFallBack.class*/)
public interface UserFeignClient{

    @GetMapping(value = "/{id}")
    public User findOne(@PathVariable("id") Long id);

    @GetMapping(value = "/moreInstance")
    public int moreInstance();

}


#feginclient客户端调用
package com.itmuch.cloud.microserviceconsumermoviefeign.controller;

import com.itmuch.cloud.microserviceconsumermoviefeign.FeginClient.UserFeignClient;
import com.itmuch.cloud.microserviceconsumermoviefeign.entity.User;
import com.netflix.hystrix.HystrixCircuitBreaker;
import com.netflix.hystrix.HystrixCommandKey;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.request.RequestContextListener;

import java.util.concurrent.*;

/**
 * : 描述信息
 *
 * @author liyy
 * @date 2018-07-28 15:13
 */
@RestController
@RequestMapping(value="/api")
public class FeignController {
    public static Logger logger = LoggerFactory.getLogger(FeignController.class);

    @Autowired
    private UserFeignClient feignClient;

    public static ExecutorService threadPool = Executors.newFixedThreadPool(3);//固定线程数量的线程池

    private RequestContextListener requestContestListener;

    @GetMapping(value="/feign/{id}")
    public User findOne(@PathVariable Long id){
        System.out.println("id:"+id);
        Long startTime  = System.currentTimeMillis();
        User u = feignClient.findOne(id);
        Long endTime  = System.currentTimeMillis();
        System.out.println("调用业务返回时间:"+(endTime - startTime));
        HystrixCircuitBreaker breaker = HystrixCircuitBreaker.Factory.getInstance(HystrixCommandKey.Factory.asKey("UserFeignClient#findOne()"));
        logger.info("断路器的状态:{}",breaker);
        return u;

    }

    @GetMapping("/feign/consumer")
    public String dc() throws ExecutionException, InterruptedException {
        logger.info("result:");
        final int[] count = {0};
        while(true){
            Future<Integer> future = threadPool.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(1000);
//                    String taskResult = String.valueOf(restTemplate.getForObject("http://eureka-client/moreInstance", String.class));
                    int taskResult = feignClient.moreInstance();
                    count[0]++;
                    System.out.println("消费票数:"+count[0]);
                    return taskResult;
                }
            });
            try {
                System.out.println("剩余票数:"+future.get());
                if(future!=null && future.get()==0){
                    System.out.println("所有任务执行完毕,共卖掉了"+ count[0] +"张票");
                    threadPool.shutdown();
                    break;
                }else if(future!=null &&future.get() == 9999){
                    System.out.println("userService异常。熔断");
                    /*threadPool.shutdown();
                    break;*/
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        return String.valueOf(count.length);
    }
}

#并发启用三个线程去执行任务
@GetMapping(value = "/moreInstance")
    @ResponseBody
    public int moreInstance() {
        int temp = 0;
        //先获取锁
        String identifier = distributedLock.lockWithTimeout("resource", 5000, 1000);
        if(identifier!=null){
            //当前线程拿到了锁,然后再执行业务逻辑,从redis中
            int value = Integer.parseInt(distributedLock.getValueKey(KEY));
            temp = value;
            System.out.println("获取redis锁,当前剩余票数:"+value+"张");
            if(value>0){
                //执行减减操作
                Long i = distributedLock.decr(KEY);
                System.out.println(Thread.currentThread().getName()+"消费一张票");
                //释放锁
                distributedLock.releaseLock("resource", identifier);
                System.out.println("释放redis锁,当前剩余票数:"+Integer.parseInt(distributedLock.getValueKey(KEY))+"张");
            }else{
                System.out.println("票买完了");
                temp = 0;
            }
        }
        return temp;

    }

#业务端代码
package com.itmuch.cloud.microserviceprovideruser.config;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.Transaction;
import redis.clients.jedis.exceptions.JedisException;

import java.util.List;
import java.util.UUID;

/**
 * @author liyy
 * @description:分布式锁
 * @date 2019-04-02 16:27
 * @program spring-cloud
 */
@Component
@Slf4j
public class DistributedLock {


    @Autowired
    private JedisPool jedisPool;

    public static  Jedis conn = null;


    /**
     * 加锁
     * @param lockName       锁的key
     * @param acquireTimeout 获取超时时间 2s
     * @param timeout        锁的超时时间 3s
     * @return 锁标识
     */
    public String lockWithTimeout(String lockName, long acquireTimeout, long timeout) {
//        Jedis conn = null;
        String retIdentifier = null;
        try {
            // 获取连接
            conn = jedisPool.getResource();
            // 随机生成一个value
            String identifier = UUID.randomUUID().toString();
            System.out.println(Thread.currentThread().getName()+"获取到value的值identifier:"+identifier);
            // 锁名,即key值
            String lockKey = "lock:" + lockName;
            // 超时时间,上锁后超过此时间则自动释放锁
            int lockExpire = (int) (timeout / 1000);//秒。传入毫秒

            // 获取锁的超时时间,超过这个时间则放弃获取锁
            long end = System.currentTimeMillis() + acquireTimeout;
            while (System.currentTimeMillis() < end) {
                if (conn.setnx(lockKey, identifier) == 1) {//加锁成功
                    System.out.println(Thread.currentThread().getName()+"加锁成功并设置锁的超时时间,key:"+"lock:" + lockName+",value:"+identifier);
                    conn.expire(lockKey, lockExpire);
                    // 返回value值,用于释放锁时间确认
                    retIdentifier = identifier;
                    return retIdentifier;
                }
                // 返回-1代表key没有设置超时时间,为key设置一个超时时间
                if (conn.ttl(lockKey) == -1) {
                    System.out.println("没有超时时间设置锁的超时时间");
                    conn.expire(lockKey, lockExpire);
                }
                try {
                    System.out.println(Thread.currentThread().getName()+"没有获取到锁");
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                conn.close();
//                jedisPool.returnBrokenResource(conn);
            }
        }
        return retIdentifier;
    }

    /**
     * 释放锁
     * @param lockName   锁的key
     * @param identifier 释放锁的标识
     * @return
     */
    public boolean releaseLock(String lockName, String identifier) {
        System.out.println("进入释放锁方法开始释放锁,lockName:"+lockName+",identifier:"+identifier);
//        Jedis conn = null;
        String lockKey = "lock:" + lockName;
        boolean retFlag = false;
        try {
            System.out.println("准备获取获取redis连接对象");
            conn = jedisPool.getResource();
            System.out.println("获取redis连接对象:"+conn);
            while (true) {
                // 监视lock,准备开始事务
                log.info("监视lock,准备开始事务,{}",lockKey);
                conn.watch(lockKey);
                // 通过前面返回的value值判断是不是该锁,若是该锁,则删除,释放锁
                System.out.println(Thread.currentThread().getName()+"当前传入的value值:"+identifier);
                System.out.println(Thread.currentThread().getName()+"当前锁的value值:"+conn.get(lockKey));
                if (identifier.equals(conn.get(lockKey))) {
                    Transaction transaction = conn.multi();
                    transaction.del(lockKey);
                    System.out.println(Thread.currentThread().getName()+"释放锁KEY:"+lockKey+",value:"+identifier);
                    List<Object> results = transaction.exec();
                    if (results == null) {
                        System.out.println(Thread.currentThread().getName()+"当前锁无法释放,只能等锁到期后自动释放");
                        continue;
                    }
                    retFlag = true;
                }
                conn.unwatch();
                break;
            }
        } catch (JedisException e) {
            e.printStackTrace();
        } finally {
            if (conn != null) {
                System.out.println("每次用完后都要释放conn对象");
//                jedisPool.returnBrokenResource(conn);
                conn.close();
            }
        }
        return retFlag;
    }

    //redis自减
    public Long decr(String key) {
        Jedis jedis = null;
        Long result = 0l;
        try {
            jedis =  jedisPool.getResource();
            result = jedis.decr(key);
        } catch (Exception e) {
            log.error("redis decr error and key = " + key, e);
        }
        return result;
    }

    //redis自增
    public Long incr(String key) {
        Jedis jedis = null;
        Long result = 0l;
        try {
            jedis =  jedisPool.getResource();
            result = jedis.incr(key);
        } catch (Exception e) {
            log.error("redis incr error and key = " + key, e);
        }
        return result;
    }

    //设置key
    public String setkeyvalue(String key,String value){
        String result = jedisPool.getResource().set(key,value);
        return result;
    }

    //根据key获取value
    public String getValueKey(String key){
        String value  = jedisPool.getResource().get(key);
        return value;
    }
}


#jedis获取锁和释放锁代码

运行结果:

基于redis实现分布式锁案例

两个控制台分别打印数据。不会出现一张票被多个线程同时消费的情况。但是我这执行到一半就挂了。因为我再redis客户单观

redis链接对象越来越多。没有释放。最后挂了