java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔
Semaphore
AQL里的同步组件,中文名:信号量
他可以测试并发访问的线程个数,控制某个资源可被同时访问的个数,常用语只能提供有限资源的东西,比如数据库连接数!
核心方法是 aquire 和 release
废话不多说,直接看怎么用!一看就懂相当简单!
demo1:
//定义线程并发个数 private final static int threadCount = 20; public static void main(String[] args) throws Exception { //声明一个线程池 ExecutorService exec = Executors.newCachedThreadPool(); // 构建semaphore实例,参数是一次性并发个数 final Semaphore semaphore = new Semaphore(3); for (int i = 0; i < threadCount; i++) { final int threadNum = i; exec.execute(() -> { try { semaphore.acquire(); // 获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } catch (Exception e) { log.error("exception", e); } }); } exec.shutdown(); } private static void test(int threadNum) throws Exception { log.info("{}", threadNum); Thread.sleep(1000); }输出结果
从执行时间我们可以看出 一次性并发执行三条,睡眠时间到后,release线程并且继续执行。
demo2:
在之前代码的基础上更改aquire和release代码如下
semaphore.acquire(3); // 获取多个许可 test(threadNum); semaphore.release(3); // 释放多个许可
这就表示一次性执行三个线程,每次acquire一次获取三个许可,并且释放三个许可
结果如下 一秒执行一个请求
demo3:
try { if (semaphore.tryAcquire()) { // 尝试获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } } catch (Exception e) { log.error("exception", e); }
可以看出当调用semaphore的truAcquire方法时,它实际上就是在不断地尝试获得一个许可,因为这三个请求处于睡眠一秒中,所以其他的待执行请求因为try时无法获得许可而被全部抛弃!
demo4:
try { if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可 test(threadNum); semaphore.release(); // 释放一个许可 } } catch (Exception e) { log.error("exception", e); }
通过结果可以看出这是在尝试获取请求,并且超时时间是5秒钟!20个请求只执行了15个因为三个三个执行,每批都间隔一秒 5秒超时时间内,执行了五批,所以总共15个请求被执行。
源码分析
首先:构造器
public Semaphore(int permits) { //同一时刻并发数 sync = new NonfairSync(permits); } public Semaphore(int permits, boolean fair) { //同一时刻并发数+是否公平锁 sync = fair ? new FairSync(permits) : new NonfairSync(permits); }
先看一个参数的构造器执行逻辑,new一个NonfairSync(并发数)
static final class NonfairSync extends Sync { private static final long serialVersionUID = -2694183684443567898L; NonfairSync(int permits) { super(permits); //setState = permits } //调用tryAcquireShared()方法尝试获取许可,未获取到则调用doAcquireSharedInterruptibly()方法将当前线程加入等待队列。 protected int tryAcquireShared(int acquires) { return nonfairTryAcquireShared(acquires); } }
final int nonfairTryAcquireShared(int acquires) { for (;;) { int available = getState(); //获取可用许可数量 int remaining = available - acquires; //计算这次许后还可以用的数量 if (remaining < 0 || compareAndSetState(available, remaining)) return remaining; } }
protected final boolean compareAndSetState(int expect, int update) { // See below for intrinsics setup to support this return unsafe.compareAndSwapInt(this, stateOffset, expect, update); //state,剩余许可量,这次许可后还可用的许可量 }只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,当剩余许可大于0,底层还是CAS操作将available
的值设置为剩余个数,所以只有在结果小于0或设置state值成功的情况下才会退出。
release操作
public void release() { sync.releaseShared(1); }
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) {//调用子类的tryReleaseShared尝试获取锁,如果失败,直接返回 doReleaseShared();//如果成功调用doReleaseShared方法做后续处理 return true; } return false;
private void doReleaseShared() { for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }目前还没有想明白具体实现原理。。。有待研究