JUC与生产者消费者
前面一篇文章介绍了生产者消费者模式,这篇来看看JUC包下的哪些类与该模式有关。
生产者消费者模式中有个中间类Channel,对于数据Data起到通道作用,还确保了Producer与Consumer这些线程的协调运行。在前篇文章所举的例子中的Table类担任这个角色,内部是使用数组实现的队列配合synchronized关键字。而在JUC包下提供了BlockingQueue接口及其实现类,它们相当于Channel角色。
BlockingQueue接口——阻塞队列
BlockingQueue接口表示的是在达到合适的状态之前线程一直阻塞的队列。是Queue的子接口,拥有多个方法如offer,poll等,但实际实现阻塞功能的方法是BlockingQueue所固有的put,take方法
ArrayBlockingQueue——基于数组的BlockingQueue
基于数组实现的BlockingQueue,元素个数有最大限制。
LinkedBlockingQueue——基于链表的BlockingQueue
元素个数没有最大限制,基于链表实现。
PriorityBlockingQueue——带有优先级的BlockingQueue
带有优先级的BlockingQueue,优先级依据Comparable接口的自然排序,或者构造函数传进的Comparator比较器的规则。
DelayQueue——一定时间后才可以take的BlockingQueue
表示的是用于存储java.util.concurrent.Delayed对象的队列。当从该队列take时,只有在该元素指定的时间到期后才能take。取出顺序按到期时间的长短排,最长的先被take出。
SynchronousQueue——直接传递的BlockingQueue
该类用于执行由Producer到Consumer的”直接传递“ 。如果Producer先put,在Consumer take之前,Producer线程将一直阻塞。反之亦然。
ConcurrentLinkedQueue——元素个数没有最大限制的线程安全队列
首先该类不同于上面那些,它并非BlockingQueue,也没有继承BlockingQueue。它表示的是元素个数没有限制的线程安全队列。
BlockingQueue是JUC提供的Channel,那么用ArrayBlocking来改写上一篇的Table。
public class TableByJUC extends ArrayBlockingQueue<String> {
public TableByJUC(int capacity) {
super(capacity);
}
public void put(String cake) throws InterruptedException {
System.out.println(Thread.currentThread().getName() + " puts " + cake);
super.put(cake);
}
public String take() throws InterruptedException {
String cake = super.take();
System.out.println(Thread.currentThread().getName() + " take " + cake);
return cake;
}
}
java.util.concurrent.Exchanger
用于让两个线程安全的交换对象
举个例子:两个线程交换缓冲区
ProducerThread填充缓冲区直到充满,调用exchange方法将填满的缓冲区传递给ConsumerThread,传递缓冲区后 作为交换接受空的缓冲区。ConsumerThread调用exchange方法将空的缓冲区传递给ProducerThread,传递后作为交换接受被填满字符的缓冲区,打印缓冲区中字符。
Main:
将buffer1传给ProducerThread,buffer2传给ConsumerThread,同时将通用的Exchanger实例传给两者。
public class Main {
public static void main(String[] args) {
Exchanger<char[]> exchanger = new Exchanger<>();
char[] buffer1 = new char[5];
char[] buffer2 = new char[5];
ProducerThread producerThread = new ProducerThread(exchanger, buffer1, 654987);
ConsumerThread consumerThread = new ConsumerThread(exchanger, buffer2, 123654);
producerThread.start();
consumerThread.start();
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
producerThread.interrupt();
consumerThread.interrupt();
}
}
ProducerThread
public class ProducerThread extends Thread {
private final Exchanger<char[]> exchanger;
private char[] buffer;
private char index;
private final Random random;
public ProducerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
super("ProducerThread");
this.exchanger = exchanger;
this.buffer = buffer;
this.random = new Random(seed);
this.index = 0;
}
@Override
public void run() {
try {
while(true) {
for (int i = 0; i < buffer.length; i++) {
// 向缓冲区添加数据
buffer[i] = nextChar();
System.out.println(Thread.currentThread().getName() + ": " +
buffer[i] + " -> ");
}
System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
buffer = exchanger.exchange(buffer);
System.out.println(Thread.currentThread().getName() + ": AFTER exchange");
}
} catch (InterruptedException e) {
}
}
// 生成字符
private char nextChar() throws InterruptedException {
char c = (char) ('A' + index%26);
index++;
Thread.sleep(random.nextInt(1000));
return c;
}
}
ConsumerThread
public class ConsumerThread extends Thread {
private final Exchanger<char[]> exchanger;
private final Random random;
private char[] buffer;
public ConsumerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
super("ConsumerThread");
this.exchanger = exchanger;
this.buffer = buffer;
this.random = new Random(seed);
}
@Override
public void run() {
try {
while (true) {
// 交换缓冲区
System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
buffer = exchanger.exchange(buffer);
System.out.println(Thread.currentThread().getName() + ": AFTER exchange");
// 从缓冲区取出数据
for (int i = 0; i < buffer.length; i++) {
System.out.println(Thread.currentThread().getName() + ": -> " + buffer[i]);
Thread.sleep(random.nextInt(1000));
}
}
} catch (InterruptedException e) {
}
}
}
运行结果:
ConsumerThread: BEFORE exchange //ConsumerThread等待echange方法被执行
ProducerThread: A -> //ProducerThread向buffer1填充A-E
ProducerThread: B ->
ProducerThread: C ->
ProducerThread: D ->
ProducerThread: E ->
ProducerThread: BEFORE exchange //ProducerThread的exchange执行(进行交换)
ProducerThread: AFTER exchange // 执行完毕接受到空的缓冲区buffer2
ConsumerThread: AFTER exchange // ConsumerThread的exchange执行后接受到被填满字符的buffer1
ConsumerThread: -> A // ConsumerThread开始依此输出
ProducerThread: F -> // ProducerThread向buffer2中填充F-K
ConsumerThread: -> B
ProducerThread: G ->
ConsumerThread: -> C
ProducerThread: H ->
ConsumerThread: -> D
ProducerThread: I ->
ConsumerThread: -> E
ConsumerThread: BEFORE exchange //ConsumerThread耗尽buffer1,开始等待新的被填满字符的buffer2
ProducerThread: J ->
ProducerThread: BEFORE exchange // buffer2填满完毕,开始交换
ProducerThread: AFTER exchange // 交换完毕,收到空的buffer1
ConsumerThread: AFTER exchange // 交换完毕,收到满的buffer2
ConsumerThread: -> F
ConsumerThread: -> G
ProducerThread: K ->
ConsumerThread: -> H
ProducerThread: L ->
ConsumerThread: -> I
ProducerThread: M ->
ProducerThread: N ->
ProducerThread: O ->
ProducerThread: BEFORE exchange
ConsumerThread: -> J
ConsumerThread: BEFORE exchange
ConsumerThread: AFTER exchange
ConsumerThread: -> K
ProducerThread: AFTER exchange
ConsumerThread: -> L
ConsumerThread: -> M
从运行结果可以看出被填满字符的缓冲区从ProducerThread流向ConsumerThread,即生成的字符单向流向ConsumerThread。而空缓冲区它的流动方向相反.
这里有一片关于Exchanger在实际业务中的应用的文章:http://lixuanbin.iteye.com/blog/2166772
这一链接是关于Exchanger源码分析:https://www.cnblogs.com/aniao/p/aniao_exchanger.html