第六章 Java并发容器和框架(ConcurrentHashMap,ConcurrentLinkedQueue,BlockingQueue,Fork Join)
1.ConcurrentHashMap
1)HashMap多线程put死循环
HashMap在并发执行put操作时会引起死循环,是因为多线程会导致HashMap的Entry链表形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获取Entry。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
package com.tlk.chapter6;
import java.util.HashMap;
import java.util.UUID;
/** * 在多线程环境下,使用HashMap进行put操作会引起死循环,导致CPU利用率接近100%
* @author tanlk
* @date 2017年5月17日下午7:39:18
*/
public class HashMapTest {
public static void main(String[] args) throws InterruptedException {
final HashMap<String, String> map = new HashMap<String, String>( 2 );
Thread t = new Thread( new Runnable() {
@Override
public void run() {
for ( int i = 0 ; i < 10000 ; i++) {
new Thread( new Runnable() {
@Override
public void run() {
map.put(UUID.randomUUID().toString(), "" );
}
}, "ftf" + i).start();
}
}
}, "ftf" );
t.start();
t.join(); //等待该线程终止。终止才会执行main结束
}
} |
3)ConcurrentHashMap的锁分段技术可有效提升并发访问率
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重入锁 (ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数据。一个
ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时,必须首先获得与它对应的Segment锁。
2. ConcurrentLinkedQueue
ConcurrentLinkedQueue是一个基于链接节点的无界线程安全队列,它采用先进先出的规则对节点进行排序,当我们添加一个元素的时候,它会添加到队列的尾部;当我们获取一个元素时,它会返回队列头部的元素。它采用了“wait-free”算法(即CAS算法)来实现
3.BlockingQueue(阻塞队列)
·ArrayBlockingQueue:一个由数组结构组成的有界阻塞队列。
·LinkedBlockingQueue:一个由链表结构组成的有界阻塞队列。
·PriorityBlockingQueue:一个支持优先级排序的无界阻塞队列。
·DelayQueue:一个使用优先级队列实现的无界阻塞队列。
·缓存系统的设计:可以用DelayQueue保存缓存元素的有效期,使用一个线程循环查询DelayQueue,一旦能从
DelayQueue中获取元素时,表示缓存有效期到了。
·定时任务调度:使用DelayQueue保存当天将会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始 执行,比如TimerQueue就是使用DelayQueue实现的
·SynchronousQueue:一个不存储元素的阻塞队列。
·LinkedTransferQueue:一个由链表结构组成的无界阻塞队列。
·LinkedBlockingDeque:一个由链表结构组成的双向阻塞队列。
阻塞队列的实现原理:使用通知模式实现。
以ArrayBlockingQueue为例,其使用了Condition来实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
|
private final Condition notFull;
private final Condition notEmpty;
public ArrayBlockingQueue( int capacity, boolean fair) {
// 省略其他代码
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this .lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
insert(e);
} finally {
lock.unlock();
}
}
p ublic E take() throws InterruptedException {
final ReentrantLock lock = this .lock;
lock.lockInterruptibly();
try {
while (count == 0 )
notEmpty.await();
return extract();
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal();
}
|
ConditionObject中await()实现为:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0 ;
while (!isOnSyncQueue(node)) {
LockSupport.park( this );
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0 )
break ;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null ) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0 )
reportInterruptAfterWait(interruptMode);
}
|
当往队列里插入一个元素时,如果队列不可用,那么阻塞生产者主要通过
LockSupport.park(this)来实现。
1
2
3
4
5
6
|
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park( false , 0L);
setBlocker(t, null );
}
|
4.Fork/Join框架
Fork/Join框架是Java 7提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
1)工作窃取算法
2)Fork/Join框架的设计
步骤1分割任务。首先我们需要有一个fork类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。
步骤2 执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据
Fork/Join使用两个类来完成以上两件事情。
①ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制。通常情况下,我们不需要直接继承ForkJoinTask类,只需要继承它的子类,Fork/Join框架提供了以下两个子类。
·RecursiveAction:用于没有返回结果的任务。
·RecursiveTask:用于有返回结果的任务。
②ForkJoinPool:ForkJoinTask需要通过ForkJoinPool来执行。任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。
斐波那契数列Java实现
递归实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
package com.tlk.chapter6;
import java.util.concurrent.TimeUnit;
public class FibCalculate {
public static long Calculate( long n){
if (n < 0 ) {
return 0 ;
}
if (n == 0 || n == 1 ) {
return n;
} else {
return Calculate(n- 1 )+Calculate(n- 2 );
}
}
public static void main(String[] args) {
long n= 50 ;
long begin = System.nanoTime();
long f = FibCalculate.Calculate(n);
long end = System.nanoTime();
System.out.println( "第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒" );
}
} |
第 50 个斐波那契数是 12586269025 , 耗时 80536 毫秒
|
Fork/Join 实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
package com.tlk.chapter6;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.TimeUnit;
/** * forkJoin实现斐波那契数列
* @author tanlk
* @date 2017年5月23日下午5:43:59
*/
public class FibTask extends RecursiveTask<Long> {
long n;
public FibTask( long n) {
this .n = n;
}
public Long compute() {
if (n <= 10 ) { // 小于10不再分解
return FibTask.calc(n);
}
FibTask f1 = new FibTask(n - 1 ); // 分解出计算n-1斐波那契数的子任务
f1.fork(); // 由ForkJoinPool分配线程执行子任务
FibTask f2 = new FibTask(n - 2 ); // 分解出计算n-2斐波那契数的子任务
return f2.compute() + f1.join();
}
public static long calc( long n) {
if (n < 0 ) {
return 0 ;
}
if (n == 0 || n == 1 ) {
return n;
} else {
return calc(n - 1 ) + calc(n - 2 );
}
}
public static void main(String[] args) {
long n = 50 ;
long begin = System.nanoTime();
FibTask FibTask = new FibTask(n);
ForkJoinPool pool = new ForkJoinPool();
long f = pool.invoke(FibTask);
long end = System.nanoTime();
System.out.println( "第" + n + "个斐波那契数是" + f + ", 耗时" + TimeUnit.NANOSECONDS.toMillis(end - begin) + "毫秒" );
}
} |
结果如下:
第 50 个斐波那契数是 12586269025 , 耗时 45037 毫秒
|
Fork/Join实现耗时有一定的提升。
非递归的方式实现,耗时0秒,递归代码是简单,但是不一定最优
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
//非递归的方式改写,过程中每个数只计算一次。 public static long calcWithoutRecursion( long n) {
if (n < 0 )
return 0 ;
if (n == 0 || n == 1 ) {
return n;
}
long fib = 0 ;
long fibOne = 1 ;
long fibTwo = 1 ;
for ( long i = 2 ; i < n; i++) {
fib = fibOne + fibTwo;
fibTwo = fibOne;
fibOne = fib;
}
return fib;
}
|