并发编程:生产者-消费者模式
一、生产者消费者模式
生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。
这个阻塞队列就是用来给生产者和消费者解耦的。纵观大多数设计模式,都会找一个第三者出来进行解耦,如工厂模式的第三者是工厂类,模板模式的第三者是模板类。在学习一些设计模式的过程中,如果先找到这个模式的第三者,能帮助我们快速熟悉一个设计模式。
二、生产者消费者Demo
生产者
package ConcurrentProgramming.middle.part3.ProviderConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @Author: zdj
* @Description: 生产者
* @Date: 2019年03月18日 9:42
*/
public class Provider implements Runnable {
//共享缓冲区
private BlockingQueue<Data> blockingQueue;
//多线程间是否启动变量,即使返回线程的状态
private volatile static boolean isRunning = true;
private static AtomicInteger count = new AtomicInteger();
//随机对象
private static Random r = new Random();
public Provider(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
}
public void run() {
while (isRunning) {
try {
//随机休眠0 - 1000 毫秒 表示获取数据(产生数据的耗时)
Thread.sleep(r.nextInt(1000));
//获取的数据进行累加
int id = count.incrementAndGet();
Data data = new Data(Integer.toString(id), "数据" + id);
System.out.println("线程" + Thread.currentThread().getName() + ",获取了数据,id为:" + id + ", 进行装载到公共缓冲区中...");
if (!this.blockingQueue.offer(data,2, TimeUnit.SECONDS)){
System.out.println("提交缓冲区数据失败....");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
public void stop(){
isRunning = false;
}
}
消费者
package ConcurrentProgramming.middle.part3.ProviderConsumer;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
/**
* @Author: zdj
* @Description: 消费者
* @Date: 2019年03月18日 9:53
*/
public class Consumer implements Runnable{
//共享缓冲区
private BlockingQueue<Data> blockingQueue;
private volatile boolean isRunning = true;
public Consumer(BlockingQueue blockingQueue){
this.blockingQueue = blockingQueue;
}
private static Random r = new Random();
public void run() {
while (isRunning){
try {
//队列中没有数据,就阻塞线程
Data data = this.blockingQueue.take();
//模拟对数据的处理
Thread.sleep(r.nextInt(1000));
System.out.println("当前消费线程:" + Thread.currentThread().getName() +
"消费成功,数据id为" + data.getId() );
}catch (Exception e){
e.printStackTrace();
}
}
}
public void stop(){
this.isRunning = false;
}
}
模拟数据
package ConcurrentProgramming.middle.part3.ProviderConsumer;
/**
* @Author: zdj
* @Description:
* @Date: 2019年03月18日 9:41
*/
@lombok.Data
public class Data {
private String id;
private String name;
public Data(String id, String name) {
this.id = id;
this.name = name;
}
}
测试Main
package ConcurrentProgramming.middle.part3.ProviderConsumer;
import java.util.concurrent.*;
/**
* @Author: zdj
* @Description:
* @Date: 2019年03月18日 13:38
*/
public class Main {
public static void main(String[] args) {
BlockingQueue<Data> blockingQueue = new LinkedBlockingQueue<Data>(10);
//生产者
Provider provider1 = new Provider(blockingQueue);
Provider provider2 = new Provider(blockingQueue);
Provider provider3 = new Provider(blockingQueue);
//消费者
Consumer consumer1 = new Consumer(blockingQueue);
Consumer consumer2 = new Consumer(blockingQueue);
Consumer consumer3 = new Consumer(blockingQueue);
//创建线程池运行,这是一个缓存的线程池,可以创建无穷大的线程,没有任务的时候不创建线程。空闲线程存活时间为60s(默认值)
ExecutorService cachePool = Executors.newCachedThreadPool();
cachePool.execute(provider1);
cachePool.execute(provider2);
cachePool.execute(provider3);
cachePool.execute(consumer1);
cachePool.execute(consumer2);
cachePool.execute(consumer3);
try {
Thread.sleep(2000);
}catch (Exception e){
e.printStackTrace();
}
provider1.stop();
provider2.stop();
provider3.stop();
consumer1.stop();
consumer2.stop();
consumer3.stop();
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
cachePool.shutdown();
}
}
运行结果
借鉴链接:http://ifeve.com/producers-and-consumers-mode/