第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)

1.ConcurrentHashMap

    1)HashMap多线程put死循环
    HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.tlk.chapter6;
 
import java.util.HashMap;
import java.util.UUID;
 
/**
 * 在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%
 * @author tanlk
 * @date 2017年5月17日下午7:39:18
 */
public class HashMapTest {
    public static void main(String[] args) throws InterruptedException {
        final HashMap<String, String> map = new HashMap<String, String>(2);
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                for (int i = 0; i < 10000; i++) {
                    new Thread(new Runnable() {
                        @Override
                        public void run() {
                            map.put(UUID.randomUUID().toString(), "");
                        }
                    }, "ftf" + i).start();
                }
            }
        }, "ftf");
        t.start();
        t.join();//等待该线程终止。终止才会执行main结束
    }
}
    2)HashTable效率低下
    3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
   ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁    (ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个
ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。


2. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现

3.BlockingQueue(阻塞队列)
    ·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
    ·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
    ·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
    ·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
            ·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从                                             DelayQueue中获取元素时,表示缓存有效期到了。
            ·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始                                      执行,比如TimerQueue就是使用DelayQueue实现的
    ·SynchronousQueue:一个不存储元素的阻塞队列。
    ·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
    ·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。

    阻塞队列的实现原理:使用通知模式实现。
     以ArrayBlockingQueue为例,其使用了Condition来实现
    
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private final Condition notFull;
    private final Condition notEmpty;
 
    public ArrayBlockingQueue(int capacity, boolean fair) {
    // 省略其他代码
        notEmpty = lock.newCondition();
        notFull = lock.newCondition();
    }
 
    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            insert(e);
        finally {
            lock.unlock();
        }
    }
 
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return extract();
        finally {
            lock.unlock();
        }
    }
 
    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal();
    }

ConditionObject中await()实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null// clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
        当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
LockSupport.park(this)来实现。
1
2
3
4
5
6
  public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        UNSAFE.park(false, 0L);
        setBlocker(t, null);
    }


4.Fork/Join框架
    Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
    1)工作窃取算法
第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)


    2)Fork/Join框架的设计
步骤1分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据

Fork/Join使用两个类来完成以上两件事情。
①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
        ·RecursiveAction:用于没有返回结果的任务。
        ·RecursiveTask:用于有返回结果的任务。
②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。


斐波那契数列Java实现
递归实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package com.tlk.chapter6;
 
import java.util.concurrent.TimeUnit;
 
public class FibCalculate {
    public static long Calculate(long n){
        if(n < 0) {  
            return 0;  
        }  
        if(n == 0 || n == 1) {  
            return n;
        }else {
            return Calculate(n-1)+Calculate(n-2);
        }
    }
     
    public static void main(String[] args) {
        long n=50;
        long begin = System.nanoTime();
        long f = FibCalculate.Calculate(n);
        long end = System.nanoTime();
        System.out.println("第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒");
    }
}
50个斐波那契数是12586269025, 耗时80536毫秒
Fork/Join 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
package com.tlk.chapter6;
 
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
 
/**
 * forkJoin实现斐波那契数列
 * @author tanlk
 * @date 2017年5月23日下午5:43:59
 */
public class FibTask extends RecursiveTask<Long> {
    long n;
 
    public FibTask(long n) {
        this.n = n;
    }
 
    public Long compute() {
        if (n <= 10) { // 小于10不再分解
            return FibTask.calc(n);
        }
        FibTask f1 = new FibTask(n - 1); // 分解出计算n-1斐波那契数的子任务
        f1.fork(); // 由ForkJoinPool分配线程执行子任务
        FibTask f2 = new FibTask(n - 2); // 分解出计算n-2斐波那契数的子任务
        return f2.compute() + f1.join();
    }
 
    public static long calc(long n) {
        if (n < 0) {
            return 0;
        }
        if (n == 0 || n == 1) {
            return n;
        else {
            return calc(n - 1) + calc(n - 2);
        }
    }
 
    public static void main(String[] args) {
        long n = 50;
        long begin = System.nanoTime();
        FibTask FibTask = new FibTask(n);
        ForkJoinPool pool = new ForkJoinPool();
        long f = pool.invoke(FibTask);
        long end = System.nanoTime();
        System.out.println("第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒");
    }
 
}
结果如下:
50个斐波那契数是12586269025, 耗时45037毫秒
Fork/Join实现耗时有一定的提升。

非递归的方式实现,耗时0秒,递归代码是简单,但是不一定最优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//非递归的方式改写,过程中每个数只计算一次。
    public static long calcWithoutRecursion(long n) {  
        if(n < 0)  
            return 0;  
        if(n == 0 || n == 1) {  
            return n;  
        }  
        long fib = 0;  
        long fibOne = 1;  
        long fibTwo = 1;  
        for(long i = 2; i < n; i++) {  
            fib = fibOne + fibTwo;  
            fibTwo = fibOne;  
            fibOne = fib;  
        }  
        return fib;  
    }