使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式
多线程下的如何高效安全传输数据。在的Concurrent包中,BlockingQueue在一个方面很好的解决了这个问题。
简介:
阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。
JDK7提供了以下7个阻塞队列:
ArrayBlockingQueue :由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :支持优先级排序的无界阻塞队列。
DelayQueue:使用优先级队列实现的无界阻塞队列。
SynchronousQueue:不存储元素的阻塞队列。
LinkedTransferQueue:链表结构组成的无界阻塞队列。
LinkedBlockingDeque:链表结构组成的双向阻塞队列。
阻塞队列提供了下列四种处理方法
使用场景:
从数据结构和方法看出,是不是很像MQ中的生产-消费模式。没错,用它搭建一个生产消费模式很方便,我们来试试吧。
1、先建一个全局的静态BlockQueue对象吧
package org.lochte.cache;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class MQCache {
public static BlockingQueue<String> producerBQ = new LinkedBlockingQueue<String>();
}
2、然后是生产者
package org.lochte.block;
import org.lochte.cache.MQCache;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
public class Producer implements Callable<Boolean> {
private String message;
public Producer(String message){
this.message = message;
}
@Override
public Boolean call() throws Exception {
try {
System.out.println("生产者:" + message);
return MQCache.producerBQ.offer(message, 2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return false;
}
}
}
- 有了生产者,还有一个消费者
package org.lochte.block;
import org.lochte.cache.MQCache;
public class Consumer implements Runnable {
@Override
public void run() {
try{
while(true){
System.out.println("消费者:" + MQCache.producerBQ.take());
}
}catch(Exception e){
e.printStackTrace();
}
}
}
4、一个最基本的 生产-消费 模式就完成了,我们用一个main函数来试试吧
package org.lochte.block;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class BlockTest {
public static void main(String[] args) throws InterruptedException {
//new了三个生产者和一个消费者
Producer producer1 = new Producer("生产1");
Producer producer2 = new Producer("生产2");
Producer producer3 = new Producer("生产3");
Consumer consumer = new Consumer();
// 使用现场池
ExecutorService service = Executors.newCachedThreadPool();
// 启动消费者
service.execute(consumer);
// 启动生产者
Thread.sleep(6000);
Future<Boolean> p1 = service.submit(producer1);
Thread.sleep(6000);
service.submit(producer2);
Thread.sleep(6000);
service.submit(producer3);
// 退出Executor
service.shutdown();
}
}
运行结果:
顺便说一句,这里的consumer用了take(),如果queue为空将会一直等待到有数据位置。若不想一直等下去可以使用poll()方法。若queue无数据将返回为null。
我们把consumer的MQCache.producerBQ.take()改为MQCache.producerBQ.pool(3,TimeUnit.SECONDS)试试
生产者这里设置了6秒的间隔,消费者2秒的间隔,中间的4秒的时差则取到的NULL值。
以上只是满足了生产消费模式,但还有一个MQ中间件最被常用的是 订阅-发布模式。这怎么办呢。我们先来看看主流MQ是怎么做的。
Kafka:
Topic:特指kafka处理的消息源
Partition(分区):Topic物理上的分组。一个Topic可以有多个Patition,每个Partition是一个有序的队列
Message:消息,通信的基本单位
Producer:生产者。向kafka的一个topic发布消息的过程叫做生产
Consumer:消费者,订阅Topic并处理其发布的消息的过程叫做消费
Broker:缓存代理,kafka集群中的一台或者多台服务器
由此可见,kafka通过broker和Partition来划分Comsumer
RabbitMQ:
又是一个多队列
RocketMQ:
想将生产-发布模式升到订阅-发布模式大体思路已经很清晰了,有3部要走
- 创建一个Topic(其实还需要搭建Broker,这里简单做一个先免了吧(T T)!)
- 一个Topic 可以对应多个Queue
- Consumer单节点的对应关系
Topic即主题,一般是一个字符串,显而易见需要用到Map,多线程的环境下,ConcurrentHashmap首选。
绑定关系相对有些区分,借鉴一下Kafka与RabbitMQ是如何绑定的
Kafka: 使用GroupId,然后分配partition。Producer和Consumer能自行制定partition的编号。
RabbitMQ: 使用交换机fanout,然后Queue对应交换机comsuner再绑定queue。
主流的MQ提供一个方式自定义queue。这下就好办了些,借鉴一下,编写一个topic对象,可以是一个list也可以是一个map,用于绑定关系。
package org.lochte.block;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
public class Topic {
private volatile ConcurrentHashMap<String, BlockingQueue<String>> topics = new ConcurrentHashMap<String, BlockingQueue<String>>();
//生产者写入
public void send(String message){
topics.forEach((k, v)->{
v.offer(message);
});
}
//注册
public void register(String queueName){
topics.put(queueName, new LinkedBlockingQueue<String>());
}
//poll方法
public String poll(String queueName){
if(topics.get(queueName)!=null)
return topics.get(queueName).poll();
else{
topics.put(queueName, new LinkedBlockingQueue<String>());
return "";
}
}
//take方法
public String take(String queueName){
if(topics.get(queueName)!=null)
return topics.get(queueName).take();
else{
topics.put(queueName, new LinkedBlockingQueue<String>());
return "";
}
}
}
全局缓存的数据结构微调
public static ConcurrentHashMap<String, Topic> producers = new ConcurrentHashMap<String, Topic>();
生产者工具调整
System.out.println("生产者:" + message);
if(MQCache.producers.get(topic) == null){
MQCache.producers.put(topic, new Topic());
}
return MQCache.producers.get(topic).send(message);
消费者工具调整
if(MQCache.producers.get(topic)==null){
Topic topic = new Topic();
topic.register(queueName);
MQCache.producers.put(this.queueName, topic)
}
while(true){
System.out.println("消费者:" + MQCache.producers.get(topic).poll(queueName));
}
编辑一下主函数
public static void main(String[] args) throws InterruptedException {
//new了三个生产者和一个消费者
Producer producer1 = new Producer("test","生产1");
Producer producer2 = new Producer("test","生产2");
Producer producer3 = new Producer("test","生产3");
Consumer consumer1 = new Consumer("test","queueName1");
Consumer consumer2 = new Consumer("test","queueName2");
// 使用线程池
ExecutorService service = Executors.newCachedThreadPool();
// 启动消费者
service.execute(consumer1);
service.execute(consumer2);
// 启动生产者
Thread.sleep(6000);
Future<Boolean> p1 = service.submit(producer1);
Thread.sleep(6000);
service.submit(producer2);
Thread.sleep(6000);
service.submit(producer3);
// 退出Executor
service.shutdown();
}
来运行一下:
OK,一个简单的发布-订阅模式貌似成功了