Java并发编程-同步工具类
1.CountDownLatch
一个同步器辅助类允许一个或多个线程等待直到其他线程的一系列操作结束。一个CountDownLatch初始化时需要给定一个计数值。await方法阻塞直到当前计数值由于countDown方法的调用减为0,在此之后,所有等待的线程被释放,并且随后await的调用立即返回。这是一次性的,计数值不能重置。如果你需要重置计数值,考虑使用CyclicBarrier。CountDownLatch是一个多用途的同步器工具,可以用于多种意图。计数值初始化为1作为一个开关, 或门:所有调用await方法的线程在门处等待,直到一个线程通过调用countDown方法把门打开。CountDownLatch初始化为n可以用来使一个线程等待n个线程完成一些动作,或者一些动作完成了n次。
下面来看个例子:
这是一个并行计算求和的例子。
public class CountDownDemo2 {
private int[] nums;
private static CountDownLatch countDownLatch;
public CountDownDemo2(int line) {
nums = new int[line];
countDownLatch = new CountDownLatch(line);
}
public void calculate(String line, int index) {
String[] numbers = line.split(",");
int total = 0;
for (String num : numbers) {
total += Integer.parseInt(num);
}
nums[index] = total;
countDownLatch.countDown();
System.out.println(Thread.currentThread().getName() + "执行计算任务: " + line + " 结果为: " + total);
}
public void sum() {
System.out.println(Thread.currentThread().getName() + "汇总线程开始执行: ");
int realTotal = 0;
for (int i = 0; i < nums.length; i++) {
realTotal += nums[i];
}
System.out.println("最终结果: " + realTotal);
}
public static void main(String[] args) throws IOException, InterruptedException {
final List<String> contents = readFile();
int lineCount = contents.size();
final CountDownDemo2 demo = new CountDownDemo2(lineCount);
for (int i = 0; i < lineCount; i++) {
final int j = i;
new Thread(new Runnable() {
@Override
public void run() {
demo.calculate(contents.get(j), j);
}
}).start();
}
// 等待其他线程结束子求和计算
countDownLatch.await();
demo.sum();
}
private static List<String> readFile() throws IOException {
List<String> contents = new ArrayList<>();
String line = null;
BufferedReader reader = null;
try {
reader = new BufferedReader(new FileReader("/workspace/current/src/com/company/condition/sample.txt"));
while ((line = reader.readLine()) != null) {
contents.add(line);
}
} catch (Exception e) {
} finally {
reader.close();
}
return contents;
}
}
运行结果:
2.CyclicBarrier
一个同步器辅助类,允许一系列线程互相等待到达一个公共的屏障点。CyclicBarrier对于涉及一个 固定大小的线程随机互相等待的程序是很有用的。屏障之所以叫它可循环的,因为在等待线程释放后它可以重用。一个CyclicBarrier支持一个可选的Runnable命令,在线程集合中的最后一个线程到达之后,其他线程释放锁之前,该命令在一个屏障点执行一次。 CyclicBarrier使用全或无破坏模型来处理失败的同步尝试:如果一个线程因为中断,失败,或者超时过早的离开屏障点,所有其他等待在屏障点的线程也会因为BrokenBarrierException或者InterruptedException异常而不正常的退出(如果他们同时也被中断)。
下面来看个实例:
public class CyclicBarrierDemo {
private Random random = new Random();
public void meeting(CyclicBarrier barrier) {
try {
Thread.sleep(random.nextInt(4000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 到达会议室,等待开会");
try {
// 等待所有线程到达屏障点
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " 发言");
}
public static void main(String[] args) {
final CyclicBarrierDemo demo = new CyclicBarrierDemo();
final CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("开始开会");
}
});
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
@Override
public void run() {
demo.meeting(barrier);
}
}).start();
}
}
}
执行结果:
3.Semaphore
一个计数信号量。从概念上讲,每个信号量包含了一个允许集合。如果有必要,每个acquire方法阻塞直到一个许可可用,然后获取该许可。每个release方法增加一个许可,潜在地释放一个正被阻塞的获取者。然而,没有实际的许可对象被使用;Semaphore只是维持可用的数量并相应地行动。信号量常常用来限制访问一些物理或者逻辑资源的线程的数量。
下面来看个实例:
public class SemaphoreDemo {
public void method(Semaphore semaphore) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is run...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
semaphore.release();
}
public static void main(String[] args) {
final SemaphoreDemo demo = new SemaphoreDemo();
final Semaphore semaphore = new Semaphore(10);
while (true) {
new Thread(new Runnable() {
@Override
public void run() {
demo.method(semaphore);
}
}).start();
}
}
}
4.Exchanger
下面是个实例:
public class ExchangerDemo {
public void a(Exchanger<String> exchanger) {
System.out.println("a线程执行...");
try {
System.out.println("a线程正在抓取数据...");
System.out.println("a线程等待对比结果...");
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String res = "123456";
try {
exchanger.exchange(res);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void b(Exchanger<String> exchanger) {
System.out.println("b线程执行...");
System.out.println("b线程开始抓取数据...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("b线程抓取数据完毕...");
String res = "123456";
String value = null;
try {
value = exchanger.exchange(res);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("开始对比...");
System.out.println("对比结果: " + res.equals(value));
}
public static void main(String[] args) {
final ExchangerDemo demo = new ExchangerDemo();
final Exchanger exchanger = new Exchanger();
new Thread(new Runnable() {
@Override
public void run() {
demo.a(exchanger);
}
}).start();
new Thread(new Runnable() {
@Override
public void run() {
demo.b(exchanger);
}
}).start();
}
}