Java并发编程实践:利用信号量实现阻塞集合demo
package com.rx.wwx;
import java.util.HashSet;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class BlockingSet<T> {
private final Set<T> set;
private final Semaphore semaphore;
private int capacity;
public BlockingSet(Set<T> set, int capacity) {
this.set = set;
semaphore = new Semaphore(capacity);
this.capacity = capacity;
}
public boolean add(T t) throws InterruptedException {
semaphore.acquire();
boolean add = set.add(t);
if (!add) {
semaphore.release();
}
return add;
}
public boolean remove(T t) {
boolean remove = set.remove(t);
if (remove) {
semaphore.release();
}
return remove;
}
public void removeAll() {
set.clear();
semaphore.release(capacity);
}
public static void main(String[] args) {
ExecutorService executorService =
Executors.newFixedThreadPool(10);
final BlockingSet<Integer> blockingSet = new BlockingSet<>(new HashSet<Integer>(), 5);
Runnable runnable = new Runnable() {
@Override
public void run() {
try {
System.out.println(Thread.currentThread().getId() + ":尝试往集合添加元素");
blockingSet.add(new Random().nextInt(100));
System.out.println(Thread.currentThread().getId() + ":添加成功");
} catch (Throwable throwable) {
throwable.printStackTrace();
}
}
};
for (int i = 0; i < 10; i++) {
executorService.execute(runnable);
}
new Thread(new Runnable() {
@Override
public void run() {
try{
System.out.println("先暂停3秒,注意观察他们是否在阻塞后又继续添加");
Thread.sleep(3000);
blockingSet.removeAll();
}catch (Throwable throwable){
throwable.printStackTrace();
}
}
}).start();
}
}
运行结果: