使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

   多线程下的如何高效安全传输数据。在的Concurrent包中,BlockingQueue在一个方面很好的解决了这个问题。

 

简介:

阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满;从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空。并发包下很多高级同步类的实现都是基于BlockingQueue实现的。

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

 

JDK7提供了以下7个阻塞队列:

ArrayBlockingQueue :由数组结构组成的有界阻塞队列。

LinkedBlockingQueue :由链表结构组成的有界阻塞队列。

PriorityBlockingQueue :支持优先级排序的*阻塞队列。

DelayQueue:使用优先级队列实现的*阻塞队列。

SynchronousQueue:不存储元素的阻塞队列。

LinkedTransferQueue:链表结构组成的*阻塞队列。

LinkedBlockingDeque:链表结构组成的双向阻塞队列。

 

阻塞队列提供了下列四种处理方法

 

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

使用场景:

从数据结构和方法看出,是不是很像MQ中的生产-消费模式。没错,用它搭建一个生产消费模式很方便,我们来试试吧。

1、先建一个全局的静态BlockQueue对象吧

package org.lochte.cache;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class MQCache {

    public static BlockingQueue<String> producerBQ = new LinkedBlockingQueue<String>();


}

 

2、然后是生产者

 

package org.lochte.block;

import org.lochte.cache.MQCache;

import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

public class Producer implements Callable<Boolean> {

    private String message;

    public Producer(String message){
        this.message = message;
    }

    @Override
    public Boolean call() throws Exception {


         try {
             System.out.println("生产者:" + message);
             return MQCache.producerBQ.offer(message, 2, TimeUnit.SECONDS);
         } catch (InterruptedException e) {
             return false;
         }
    }
}

 

 

 

  1. 有了生产者,还有一个消费者

package org.lochte.block;

import org.lochte.cache.MQCache;

public class Consumer  implements Runnable {


    @Override
    public void run() {
        try{
            while(true){
                    System.out.println("消费者:" + MQCache.producerBQ.take());
            }
        }catch(Exception e){
            e.printStackTrace();
        }
    }
}

 

4、一个最基本的 生产-消费 模式就完成了,我们用一个main函数来试试吧

package org.lochte.block;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class BlockTest {

    public static void main(String[] args) throws InterruptedException {

        //new了三个生产者和一个消费者
        Producer producer1 = new Producer("生产1");
        Producer producer2 = new Producer("生产2");
        Producer producer3 = new Producer("生产3");
        Consumer consumer = new Consumer();

        // 使用现场池
        ExecutorService service = Executors.newCachedThreadPool();
        // 启动消费者
        service.execute(consumer);
        // 启动生产者
        Thread.sleep(6000);
        Future<Boolean> p1 = service.submit(producer1);
        Thread.sleep(6000);
        service.submit(producer2);
        Thread.sleep(6000);
        service.submit(producer3);

        // 退出Executor
        service.shutdown();
    }

}

 

运行结果:

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

顺便说一句,这里的consumer用了take(),如果queue为空将会一直等待到有数据位置。若不想一直等下去可以使用poll()方法。若queue无数据将返回为null。

我们把consumer的MQCache.producerBQ.take()改为MQCache.producerBQ.pool(3,TimeUnit.SECONDS)试试

 

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

生产者这里设置了6秒的间隔,消费者2秒的间隔,中间的4秒的时差则取到的NULL值。

 

以上只是满足了生产消费模式,但还有一个MQ中间件最被常用的是 订阅-发布模式。这怎么办呢。我们先来看看主流MQ是怎么做的。

Kafka:

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

Topic:特指kafka处理的消息源

Partition(分区):Topic物理上的分组。一个Topic可以有多个Patition,每个Partition是一个有序的队列

Message:消息,通信的基本单位

Producer:生产者。向kafka的一个topic发布消息的过程叫做生产

Consumer:消费者,订阅Topic并处理其发布的消息的过程叫做消费

Broker:缓存代理,kafka集群中的一台或者多台服务器


由此可见,kafka通过broker和Partition来划分Comsumer


RabbitMQ:

 

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

又是一个多队列

RocketMQ:

 

 

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

想将生产-发布模式升到订阅-发布模式大体思路已经很清晰了,有3部要走

  1. 创建一个Topic(其实还需要搭建Broker,这里简单做一个先免了吧(T T)!)
  2. 一个Topic 可以对应多个Queue
  3. Consumer单节点的对应关系

 

Topic即主题,一般是一个字符串,显而易见需要用到Map,多线程的环境下,ConcurrentHashmap首选。

 

绑定关系相对有些区分,借鉴一下Kafka与RabbitMQ是如何绑定的

Kafka: 使用GroupId,然后分配partition。Producer和Consumer能自行制定partition的编号。

RabbitMQ: 使用交换机fanout,然后Queue对应交换机comsuner再绑定queue。

 

主流的MQ提供一个方式自定义queue。这下就好办了些,借鉴一下,编写一个topic对象,可以是一个list也可以是一个map,用于绑定关系。

package org.lochte.block;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;

public class Topic {

    private volatile ConcurrentHashMap<String, BlockingQueue<String>> topics = new ConcurrentHashMap<String, BlockingQueue<String>>();

    //生产者写入
    public void send(String message){
        topics.forEach((k, v)->{
            v.offer(message);
        });
    }
    //注册
    public void register(String queueName){
        topics.put(queueName, new LinkedBlockingQueue<String>());
    }
    //poll方法
    public String poll(String queueName){
      if(topics.get(queueName)!=null)
        return topics.get(queueName).poll();
      else{
        topics.put(queueName, new LinkedBlockingQueue<String>());
        return "";
       }
    }

    //take方法
    public String take(String queueName){
        if(topics.get(queueName)!=null)
            return topics.get(queueName).take();
        else{
            topics.put(queueName, new LinkedBlockingQueue<String>());
            return "";
        }
    }
}

全局缓存的数据结构微调

public static ConcurrentHashMap<String, Topic> producers = new ConcurrentHashMap<String, Topic>();

生产者工具调整

System.out.println("生产者:" + message);
if(MQCache.producers.get(topic) == null){
    MQCache.producers.put(topic, new Topic());
}
return MQCache.producers.get(topic).send(message);

消费者工具调整

if(MQCache.producers.get(topic)==null){
    Topic topic = new Topic();
    topic.register(queueName);
    MQCache.producers.put(this.queueName, topic)
}
while(true){
        System.out.println("消费者:" + MQCache.producers.get(topic).poll(queueName));
}

编辑一下主函数

public static void main(String[] args) throws InterruptedException {

    //new了三个生产者和一个消费者
    Producer producer1 = new Producer("test","生产1");
    Producer producer2 = new Producer("test","生产2");
    Producer producer3 = new Producer("test","生产3");
    Consumer consumer1 = new Consumer("test","queueName1");
    Consumer consumer2 = new Consumer("test","queueName2");

    // 使用线程池
    ExecutorService service = Executors.newCachedThreadPool();
    // 启动消费者
    service.execute(consumer1);
    service.execute(consumer2);
    // 启动生产者
    Thread.sleep(6000);
    Future<Boolean> p1 = service.submit(producer1);
    Thread.sleep(6000);
    service.submit(producer2);
    Thread.sleep(6000);
    service.submit(producer3);

    // 退出Executor
    service.shutdown();
}

来运行一下:

使用阻塞队列BlockingQueue简单模拟MQ中间件中的生产-消费,发布-订阅模式

 

OK,一个简单的发布-订阅模式貌似成功了