【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

点击上方"蓝字", 右上角选择“设为星标”

 周一至周五早8点半!精品技术文章准时送上!


本文转自公众号:crossoverJie


目录

一、写在前面

二、造个*先

三、基本测试

四、并发测试

五、窥探本尊:ArrayBlockingQueue

六、实际场景中玩儿一把



一、写在前面

较长一段时间以来,我都发现不少开发者对 jdk 中的 J.U.C(java.util.concurrent)也就是 Java 并发包的使用甚少,更别谈对它的理解了。但这却也是我们进阶的必备关卡。


之前或多或少也分享过相关内容,但都不成体系;于是便想整理一套与并发包相关的系列文章。


其中的内容主要包含以下几个部分:

  • 根据定义自己实现一个并发工具。

  • JDK 的标准实现。

  • 实践案例。

基于这三点我相信大家对这部分内容不至于一问三不知。

既然开了一个新坑,就不想做的太差;所以我打算将这个列表下的大部分类都讲到。

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

所以本次重点讨论 ArrayBlockingQueue


二、造个*先

在自己实现之前先搞清楚阻塞队列的几个特点:

  • 基本队列特性:先进先出。

  • 写入队列空间不可用时会阻塞。

  • 获取队列数据时当队列为空时将阻塞。

实现队列的方式多种,总的来说就是数组和链表;其实我们只需要搞清楚其中一个即可,不同的特性主要表现为数组和链表的区别。


这里的 ArrayBlockingQueue 看名字很明显是由数组实现,我们先根据它这三个特性尝试自己实现试试。


初始化队列

我这里自定义了一个类: ArrayQueue,它的构造函数如下:

    public ArrayQueue(int size) {        items = new Object[size];    }


很明显这里的 items 就是存放数据的数组;在初始化时需要根据大小创建数组。

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

写入队列

写入队列比较简单,只需要依次把数据存放到这个数组中即可,如下图:

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”


但还是有几个需要注意的点:

  • 队列满的时候,写入的线程需要被阻塞。

  • 写入过队列的数量大于队列大小时需要从第一个下标开始写。


先看第一个 队列满的时候,写入的线程需要被阻塞,先来考虑下如何才能使一个线程被阻塞,看起来的表象线程卡住啥事也做不了。


有几种方案可以实现这个效果:

  • Thread.sleep(timeout)线程休眠。

  • object.wait() 让线程进入 waiting 状态。


当然还有一些 join、LockSupport.part 等不在本次的讨论范围。


阻塞队列还有一个非常重要的特性是:当队列空间可用时(取出队列),写入线程需要被唤醒让数据可以写入进去。


所以很明显 Thread.sleep(timeout)不合适,它在到达超时时间之后便会继续运行;达不到空间可用时才唤醒继续运行这个特点。


其实这样的一个特点很容易让我们想到 Java 的等待通知机制来实现线程间通信


所以我这里的做法是,一旦队列满时就将写入线程调用 object.wait() 进入 waiting 状态,直到空间可用时再进行唤醒。


  1. /**

  2. * 队列满时的阻塞锁

  3. */

  4. private Object full = new Object();


  5. /**

  6. * 队列空时的阻塞锁

  7. */

  8. private Object empty = new Object();

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

所以这里声明了两个对象用于队列满、空情况下的互相通知作用。


在写入数据成功后需要使用 empty.notify(),这样的目的是当获取队列为空时,一旦写入数据成功就可以把消费队列的线程唤醒。

这里的 wait 和 notify 操作都需要对各自的对象使用 synchronized 方法块,这是因为 wait 和 notify 都需要获取到各自的锁。


消费队列

上文也提到了:当队列为空时,获取队列的线程需要被阻塞,直到队列中有数据时才被唤醒。

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”


代码和写入的非常类似,也很好理解;只是这里的等待、唤醒恰好是相反的,通过下面这张图可以很好理解:

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”


总的来说就是:

  • 写入队列满时会阻塞直到获取线程消费了队列数据后唤醒写入线程

  • 消费队列空时会阻塞直到写入线程写入了队列数据后唤醒消费线程


三、基本测试

先来一个基本的测试:单线程的写入和消费。

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

3123123412345

通过结果来看没什么问题。


当写入的数据超过队列的大小时,就只能消费之后才能接着写入。

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

2019-04-09 16:24:41.040 [Thread-0] INFO  c.c.concurrent.ArrayQueueTest - [Thread-0]1232019-04-09 16:24:41.040 [main] INFO  c.c.concurrent.ArrayQueueTest - size=32019-04-09 16:24:41.047 [main] INFO  c.c.concurrent.ArrayQueueTest - 12342019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 123452019-04-09 16:24:41.048 [main] INFO  c.c.concurrent.ArrayQueueTest - 123456

从运行结果也能看出只有当消费数据后才能接着往队列里写入数据。


【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

而当没有消费时,再往队列里写数据则会导致写入线程被阻塞。


四、并发测试

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

三个线程并发写入300条数据,其中一个线程消费一条。

=====0299

最终的队列大小为 299,可见线程也是安全的。

由于不管是写入还是获取方法里的操作都需要获取锁才能操作,所以整个队列是线程安全的。


五、窥探本尊:ArrayBlockingQueue

下面来看看 JDK 标准的 ArrayBlockingQueue 的实现,有了上面的基础会更好理解。

初始化队列

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

看似要复杂些,但其实逐步拆分后也很好理解:

  • 第一步其实和我们自己写的一样,初始化一个队列大小的数组。

  • 第二步初始化了一个重入锁,这里其实就和我们之前使用的 synchronized 作用一致的;


只是这里在初始化重入锁的时候默认是 非公平锁,当然也可以指定为 true 使用公平锁;这样就会按照队列的顺序进行写入和消费。


三四两步则是创建了 notEmpty notFull 这两个条件,他的作用于用法和之前使用的 object.wait/notify 类似。


这就是整个初始化的内容,其实和我们自己实现的非常类似。


写入队列

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

其实会发现阻塞写入的原理都是差不多的,只是这里使用的是 Lock 来显式获取和释放锁。


同时其中的 notFull.await();notEmpty.signal(); 和我们之前使用的 object.wait/notify 的用法和作用也是一样的。


当然它还是实现了超时阻塞的 API

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

也是比较简单,使用了一个具有超时时间的等待方法。


消费队列

再看消费队列:

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

也是差不多的,一看就懂。而其中的超时 API 也是使用了 notEmpty.awaitNanos(nanos) 来实现超时返回的,就不具体说了。


六、实际场景中玩儿一把

说了这么多,来看一个队列的实际案例吧。

背景是这样的:

有一个定时任务会按照一定的间隔时间从数据库中读取一批数据,需要对这些数据做校验同时调用一个远程接口。


简单的做法就是由这个定时任务的线程去完成读取数据、消息校验、调用接口等整个全流程;但这样会有一个问题:


假设调用外部接口出现了异常、网络不稳导致耗时增加就会造成整个任务的效率降低,因为他都是串行会互相影响。


所以我们改进了方案:

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

其实就是一个典型的生产者消费者模型:

  • 生产线程从数据库中读取消息丢到队列里。

  • 消费线程从队列里获取数据做业务逻辑。


这样两个线程就可以通过这个队列来进行解耦,互相不影响,同时这个队列也能起到缓冲的作用。


但在使用过程中也有一些小细节值得注意。


因为这个外部接口是支持批量执行的,所以在消费线程取出数据后会在内存中做一个累加,一旦达到阈值或者是累计了一个时间段便将这批累计的数据处理掉。


但由于开发者的大意,在消费的时候使用的是 queue.take() 这个阻塞的 API;正常运行没啥问题。


可一旦原始的数据源,也就是 DB 中没数据了,导致队列里的数据也被消费完后这个消费线程便会被阻塞。


这样上一轮积累在内存中的数据便一直没机会使用,直到数据源又有数据了,一旦中间间隔较长时便可能会导致严重的业务异常。


所以我们最好是使用 queue.poll(timeout) 这样带超时时间的 api,除非业务上有明确的要求需要阻塞。


这个习惯同样适用于其他场景,比如调用 http、rpc 接口等都需要设置合理的超时时间。

End

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

公众号后台回复“学习”,获取作者独家秘制学习资料


还没读够?更多原创系列文章,请移步至






【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

持续关注,一大波原创系列文章正在路上

欢迎扫描下方二维码

【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”

石杉的架构笔记(id:shishan100)

BAT架构经验倾囊相授



【并发包源码窥探】将ArrayBlockingQueue扒得“体无完肤”