Java Redis连接池的简单实现(参考go语言中的channel)
1、声明一个连接池管理接口
package com.biubiu.common;
import redis.clients.jedis.Jedis;
public interface IPond {
//public void configure(); //可以写一个类似于init的方法用来配置参数等,这里直接用构造方法了
//public <T> T acquires(); // 获取资源, 抽象出通用的连接池
public Jedis acquire() throws Exception; // 获取资源
public void release(Jedis jedis) throws Exception ; // 释放资源
public void close(Jedis jedis); // 关闭资源
public void shutdown(); // 关闭池
}
2、定义一个结构体,用于描述连接池的属性和参数
package com.biubiu.common;
import redis.clients.jedis.Jedis;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author yule.zhang
* 目标连接池
* 可以自己扩展成通用的连接池
*/
public class GoalPond {
//添加元素的方法有三个:add,put,offer,且这三个元素都是向队列尾部添加元素的意思。
// add方法在添加元素的时候,若超出了度列的长度会直接抛出异常:
// put方法,若向队尾添加元素的时候发现队列已经满了会发生阻塞一直等待空间,以加入元素。
// offer方法在添加元素时,如果发现队列已满无法添加的话,会直接返回false。
//从队列中取出并移除头元素的方法有:poll,remove,take。
// poll: 若队列为空,返回null。
// remove:若队列为空,抛出NoSuchElementException异常。
// take:若队列为空,发生阻塞,等待有元素。
protected LinkedBlockingQueue<Jedis> pool; //存放连接的池,用 channel 来存放,在Java中可能是用ArrayList或者LinkedList等来存放,相当于一个容器
protected int maxOpen = 0; //池中最大资源数
protected AtomicInteger numOpen = new AtomicInteger(0); //池中当前资源数
protected int minOpen = 0; //池中最小资源数
protected AtomicBoolean closed = new AtomicBoolean(false); //池是否已经关闭
protected long maxLifeTime = 0L; //最大存活时间
}
3、具体连接池的实现
package com.biubiu.common;
import redis.clients.jedis.Jedis;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
public class Pond extends GoalPond implements IPond {
/**
* 初始化连接池
* @param minOpen
* @param maxOpen
* @param maxLifeTime
* @throws Exception
*/
public Pond(int minOpen, int maxOpen, long maxLifeTime) throws Exception {
if(maxOpen <= 0 || minOpen > maxOpen) {
throw new Exception("invalid pool config");
}
this.minOpen = minOpen;
this.maxOpen = maxOpen;
this.maxLifeTime = maxLifeTime;
this.pool = new LinkedBlockingQueue<>(maxOpen);
for(int i = 0; i < minOpen; i++) {
//创建连接对象
Jedis jedis = new Jedis("127.0.0.1",6379);
jedis.auth("root");
this.numOpen.incrementAndGet();
//当前可用资源数 + 1
System.out.println("连接被创建的次数: " + numOpen.get());
//连接资源存入队列,即池子中
this.pool.offer(jedis);
}
}
/**
* 获取连接
* @return
* @throws Exception
*/
@Override
public Jedis acquire() throws Exception {
long now = System.currentTimeMillis();
Jedis jedis = this.getOrCreate();
//请求超时了
if(this.maxLifeTime > 0 && (System.currentTimeMillis() - now) > maxLifeTime) {
System.out.println("超时释放");
this.release(jedis);
}
while(jedis == null) {
//等待取出用完的的连接
jedis = this.pool.poll(maxLifeTime - (System.currentTimeMillis() - now), TimeUnit.MILLISECONDS);
//如果取出的是空
if(jedis == null) {
//判断是否超时
if(System.currentTimeMillis() - now > maxLifeTime) {
System.out.println("请求超时");
return null;
} else {
//继续等待
continue;
}
}
}
System.out.println("从队列里拿到连接");
return jedis;
}
/**
* 释放连接,连接回鬼池子中
* @param jedis
* @throws Exception
*/
@Override
public void release(Jedis jedis) throws Exception {
//连接返回到对列中
if(jedis == null) {
System.out.println("释放 的jedis为空");
//this.numOpen.decrementAndGet();
return;
}
this.pool.offer(jedis);
System.out.println("归还入池");
}
/**
* 关闭单个连接
* @param jedis
*/
@Override
public void close(Jedis jedis) {
//队列中可用资源减去 1
this.numOpen.decrementAndGet();
//移出队列
this.pool.remove(jedis);
//连接对象关闭
jedis.close();
}
/**
* 关闭连接池
*/
@Override
public void shutdown() {
if(this.closed.get()) {
return;
}
this.pool.stream().forEach(p -> {
p.close();
this.numOpen.decrementAndGet();
});
this.closed = new AtomicBoolean(true);
}
/**
* 私有方法获取抱一个连接
* @return
* @throws Exception
*/
private Jedis getOrCreate() throws Exception {
if(!this.pool.isEmpty()) {
return this.pool.poll();
}
Jedis jedis = null;
//当前可用资源数小于最大资源数限制
if(numOpen.get() < maxOpen) {
int temp = numOpen.incrementAndGet();
if(temp <= maxOpen) {
jedis = new Jedis("127.0.0.1",6379);
jedis.auth("root");
System.out.println("连接被创建的次数: " + temp);
//入队列
this.pool.offer(jedis);
} else {
//数量减回去
numOpen.decrementAndGet();
}
}
return jedis;
}
}
4、测试程序,模拟多线程并发
(ps:这个我是在springboot里面测试的,所以测试类实现了CommandLineRunner,可以在程序启动的时候就可以运行,你可以改用普通的main函数测试)
package com.biubiu.common;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import redis.clients.jedis.Jedis;
import java.util.concurrent.CountDownLatch;
@Component
public class Test implements CommandLineRunner {
private static int threadCount = 30;
private final static CountDownLatch countDownLatch = new CountDownLatch(threadCount); //为保证30个线程同时并发运行
@Override
public void run(String... args) throws Exception {
final IPond pool = new Pond(2, 10, 2000L);
//连接池最大连接数和获取连接的超时时间
//循环开30个线程
for(int i = 0 ; i < threadCount ; i++) {
new Thread(new Runnable() {
@Override
public void run() {
//每个线程里循环十次获取连接 , 30个线程一共获取300次连接
for(int j = 0; j < 10; j++) {
Jedis jedis = null;
try {
countDownLatch.countDown(); //每次减一
countDownLatch.await(); //此处等待状态,为了让30个线程同时进行
jedis = pool.acquire();
jedis.get("a");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
pool.release(jedis); //释放连接
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
}).start();
}
}
}
5、测试结果
太长了就不粘贴了,差不多是300次吧。