java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔

Semaphore

AQL里的同步组件,中文名:信号量

他可以测试并发访问的线程个数,控制某个资源可被同时访问的个数,常用语只能提供有限资源的东西,比如数据库连接数!

核心方法是  aquire release

废话不多说,直接看怎么用!一看就懂相当简单!

demo1:

//定义线程并发个数
private final static int threadCount = 20;
public static void main(String[] args) throws Exception {
    //声明一个线程池
    ExecutorService exec = Executors.newCachedThreadPool();
    // 构建semaphore实例,参数是一次性并发个数
    final Semaphore semaphore = new Semaphore(3);

    for (int i = 0; i < threadCount; i++) {
        final int threadNum = i;
        exec.execute(() -> {
            try {
                semaphore.acquire(); // 获取一个许可
                test(threadNum);
                semaphore.release(); // 释放一个许可
            } catch (Exception e) {
                log.error("exception", e);
            }
        });
    }
    exec.shutdown();
}

private static void test(int threadNum) throws Exception {
    log.info("{}", threadNum);
    Thread.sleep(1000);
}
输出结果

java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔

从执行时间我们可以看出 一次性并发执行三条,睡眠时间到后,release线程并且继续执行。


demo2:

在之前代码的基础上更改aquire和release代码如下

semaphore.acquire(3); // 获取多个许可
test(threadNum);
semaphore.release(3); // 释放多个许可

这就表示一次性执行三个线程,每次acquire一次获取三个许可,并且释放三个许可

结果如下 一秒执行一个请求

java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔


demo3:

try {
    if (semaphore.tryAcquire()) { // 尝试获取一个许可
        test(threadNum);
        semaphore.release(); // 释放一个许可
    }
} catch (Exception e) {
    log.error("exception", e);
}

java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔

可以看出当调用semaphore的truAcquire方法时,它实际上就是在不断地尝试获得一个许可,因为这三个请求处于睡眠一秒中,所以其他的待执行请求因为try时无法获得许可而被全部抛弃!


demo4:

try {
    if (semaphore.tryAcquire(5000, TimeUnit.MILLISECONDS)) { // 尝试获取一个许可
        test(threadNum);
        semaphore.release(); // 释放一个许可
    }
} catch (Exception e) {
    log.error("exception", e);
}

java多线程知识点 - AQS - Semaphore(源码未完全读懂版 - -)仅仅是一个随笔

通过结果可以看出这是在尝试获取请求,并且超时时间是5秒钟!20个请求只执行了15个因为三个三个执行,每批都间隔一秒 5秒超时时间内,执行了五批,所以总共15个请求被执行。


源码分析

首先:构造器

public Semaphore(int permits) {    //同一时刻并发数
    sync = new NonfairSync(permits);
}

public Semaphore(int permits, boolean fair) {   //同一时刻并发数+是否公平锁
    sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

先看一个参数的构造器执行逻辑,new一个NonfairSync(并发数)

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);   //setState = permits
    }
//调用tryAcquireShared()方法尝试获取许可,未获取到则调用doAcquireSharedInterruptibly()方法将当前线程加入等待队列。
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();     //获取可用许可数量
        int remaining = available - acquires;   //计算这次许后还可以用的数量
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update); //state,剩余许可量,这次许可后还可用的许可量
}
只有在许可不够时返回值才会小于0,其余返回的都是剩余许可数量,当剩余许可大于0,底层还是CAS操作将available 
的值设置为剩余个数,所以只有在结果小于0或设置state值成功的情况下才会退出。

release操作

public void release() {
    sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//调用子类的tryReleaseShared尝试获取锁,如果失败,直接返回
        doReleaseShared();//如果成功调用doReleaseShared方法做后续处理
        return true;
    }
    return false;
private void doReleaseShared() {

    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}
目前还没有想明白具体实现原理。。。有待研究