java并发编程(十六)-并发容器后续(ConcurrentSkipListMap ConcurrentSkipListSet ConcurrentLinkedQueue 写时复制容器)

ConcurrentSkipListMap  和 ConcurrentSkipListSet

TreeMap和TreeSet有序的容器,这两种容器的并发版本

ConcurrentSkipListSet线程安全的有序的集合,适用于高并发的场景。

ConcurrentSkipListSetTreeSet,它们虽然都是有序的集合。但是,

  • 第一,它们的线程安全机制不同,TreeSet是非线程安全的,而ConcurrentSkipListSet是线程安全的。
  • 第二,ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的,而TreeSet是通过TreeMap实现的。

java并发编程(十六)-并发容器后续(ConcurrentSkipListMap ConcurrentSkipListSet ConcurrentLinkedQueue 写时复制容器)

 

说明:

  • ConcurrentSkipListSet继承于AbstractSet。因此,它本质上是一个集合
  • ConcurrentSkipListSet实现了NavigableSet接口。因此,ConcurrentSkipListSet是一个有序的集合。
  • ConcurrentSkipListSet是通过ConcurrentSkipListMap实现的。
  • ConcurrentSkipListSet包含一个ConcurrentNavigableMap对象m,而m对象实际上是ConcurrentSkipListMap的实例。
  • ConcurrentSkipListMap中的元素是key-value键值对
  • ConcurrentSkipListSet是集合,它只用到了ConcurrentSkipListMap中的key

跳表

SkipList,以空间换时间,在原链表的基础上形成多层索引,但是某个节点在插入时,是否成为索引,随机决定,所以跳表又称为概率数据结构。

跳表分为许多层(level),每一层都可以看作是数据的索引,这些索引的意义就是加快跳表查找数据速度。每一层的数据都是有序的,上一层数据是下一层数据的子集,并且第一层(level 1)包含了全部的数据;层次越高,跳跃性越大,包含的数据越少。
跳表包含一个表头,它查找数据时,是从上往下,从左往右进行查找。

java并发编程(十六)-并发容器后续(ConcurrentSkipListMap ConcurrentSkipListSet ConcurrentLinkedQueue 写时复制容器)

代码实现:

package com.xiangxue.ch5.bitwise;

import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentSkipListMap;

public class ConcurrentSkipListMapDemo1 {
	
	//private static Map<String, String> map = new TreeMap<String, String>();
	
	private static Map<String,String> map =new ConcurrentSkipListMap<String,String>();
	
	public static void main(String[] args) {
		
		new MyThread("a").start();
		new MyThread("b").start();
	}
	
	private static void printAll(){
		String key,value;
		Iterator<Entry<String, String>> iterator = map.entrySet().iterator();
		
		while(iterator.hasNext()){
			Entry<String, String> entry = iterator.next();
			key = entry.getKey();
			value = entry.getValue();
			System.out.print("("+key +" ,"+value+"), ");
		}
	
		System.out.println();
	}
	
	
	private static class MyThread extends Thread{
		
		public MyThread(String name){
			super(name);
		}
		
		
		@Override
		public void run() {
			int i = 0 ;
			while(i++ < 6){
				String val = Thread.currentThread().getName()+ i;
				map.put(val, "0");
				printAll();
			}
		}
	}

}

运行结果(某一次):

(a1 ,0), (b1 ,0), 
(a1 ,0), (a1 ,0), (b1 ,0), (b2 ,0), 
(a1 ,0), (b1 ,0), (b2 ,0), (b1 ,0), (b2 ,0), (b3 ,0), 
(a1 ,0), (a2 ,0), (b1 ,0), (b2 ,0), (b3 ,0), 
(a1 ,0), (a2 ,0), (a3 ,0), (b1 ,0), (b2 ,0), (b3 ,0), 
(a1 ,0), (a2 ,0), (a3 ,0), (a4 ,0), (b1 ,0), (b2 ,0), (b3 ,0), 
(b3 ,0), (a1 ,0), 
(a2 ,0), (a1 ,0), (a3 ,0), (a2 ,0), (a3 ,0), (a4 ,0), (a5 ,0), (b1 ,0), (a4 ,0), (a5 ,0), (b1 ,0), (b2 ,0), (b3 ,0), (b4 ,0), 
(a1 ,0), (a2 ,0), (a3 ,0), (a4 ,0), (a5 ,0), (b2 ,0), (b3 ,0), (b4 ,0), 
(a6 ,0), (a1 ,0), (b1 ,0), (b2 ,0), (a2 ,0), (a3 ,0), (a4 ,0), (a5 ,0), (a6 ,0), (b1 ,0), (b2 ,0), (b3 ,0), (b4 ,0), (b3 ,0), (b5 ,0), 
(b4 ,0), (b5 ,0), (b6 ,0), 
(a1 ,0), (a2 ,0), (a3 ,0), (a4 ,0), (a5 ,0), (a6 ,0), (b1 ,0), (b2 ,0), (b3 ,0), (b4 ,0), (b5 ,0), (b6 ,0), 
 

结果说明
示例程序中,启动两个线程(线程a和线程b)分别对ConcurrentSkipListMap进行操作。以线程a而言,它会先获取“线程名”+“序号”,然后将该字符串作为key,将“0”作为value,插入到ConcurrentSkipListMap中;接着,遍历并输出ConcurrentSkipListMap中的全部元素。 线程b的操作和线程a一样,只不过线程b的名字和线程a的名字不同。
当map是ConcurrentSkipListMap对象时,程序能正常运行。如果将map改为TreeMap时,程序会产生ConcurrentModificationException异常。

 

ConcurrentLinkedQueue--LinkedList的并发版本

无界非阻塞队列,底层是个链表,遵循先进先出原则。

add,offer将元素插入到尾部,两个方法一样的,peek(拿头部的数据,但是不移除)和poll(拿头部的数据,但是移除)

size or isEmpty

如果此队列包含的元素数大于 Integer.MAX_VALUE,则返回 Integer.MAX_VALUE。
需要小心的是,与大多数 collection 不同,此方法不是 一个固定时间操作。由于这些队列的异步特性,确定当前的元素数需要进行一次花费 O(n) 时间的遍历。
所以在需要判断队列是否为空时,尽量不要用 queue.size()>0,而是用 !queue.isEmpty()
package com.xiangxue.ch5.bitwise;

import java.util.concurrent.ConcurrentLinkedQueue;

public class ConcurrentLinkedQueueDemo {
	
	public static void main(String[] args) {
		ConcurrentLinkedQueue<String> queue = new ConcurrentLinkedQueue();
		queue.offer("caojiulu");
		System.out.println(queue.peek());
		System.out.println(queue.isEmpty());
		System.out.println(queue.poll());
		System.out.println(queue.isEmpty());
	}

}

写时复制容器 

CopyOnWriteArrayList容器是Collections.synchronizedList(List list)的替代方案,CopyOnWriteArrayList在某些情况下具有更好的性能,考虑读远大于写的场景,如果把所有的读操作进行加锁,因为只有一个读线程能够获得锁,所以其他的读线程都必须等待,大大影响性能。CopyOnWriteArrayList称为“写时复制”容器,就是在多线程操作容器对象时,把容器复制一份,这样在线程内部的修改就与其他线程无关了,而且这样设计可以做到不阻塞其他的读线程。从JDK1.5开始Java并发包里提供了两个使用CopyOnWrite机制实现的并发容器,它们是CopyOnWriteArrayList和CopyOnWriteArraySet。

CopyOnWriteArrayList源码

先说说CopyOnWriteArrayList容器的实现原理:简单地说,就是在需要对容器进行操作的时候,将容器拷贝一份,对容器的修改等操作都在容器的拷贝中进行,当操作结束,再把容器容器的拷贝指向原来的容器。这样设计的好处是实现了读写分离,并且读读不会发生阻塞。下面的源码是CopyOnWriteArrayList的add方法实现:

public void add(int index, E element) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (index > len || index < 0)
                throw new IndexOutOfBoundsException("Index: "+index+
                                                    ", Size: "+len);
            Object[] newElements;
            int numMoved = len - index;
            //1、复制出一个新的数组
            if (numMoved == 0)
                newElements = Arrays.copyOf(elements, len + 1);
            else {
                newElements = new Object[len + 1];
                System.arraycopy(elements, 0, newElements, 0, index);
                System.arraycopy(elements, index, newElements, index + 1,
                                 numMoved);
            }
            //2、把新元素添加到新数组中
            newElements[index] = element;
            //3、把数组指向原来的数组
            setArray(newElements);
        } finally {
            lock.unlock();
        }
    }

上面的三个步骤实现了写时复制的思想,在读数据的时候不会锁住list,因为写操作是在原来容器的拷贝上进行的。而且,可以看到,如果对容器拷贝修改的过程中又有新的读线程进来,那么读到的还是旧的数据。读的代码如下:

public E get(int index) {
        return get(getArray(), index);
    }

final Object[] getArray() {
       return array;
   }

CopyOnWrite并发容器用于读多写少的并发场景。比如白名单,黑名单,商品类目的访问和更新场景。

CopyOnWriteArrayList的缺点

从CopyOnWriteArrayList的实现原理可以看到因为在需要容器对象的时候进行拷贝,所以存在两个问题:内存占用问题和数据一致性问题。

内存占用问题

因为需要将原来的对象进行拷贝,这需要一定的开销。特别是当容器对象过大的时候,因为拷贝而占用的内存将增加一倍(原来驻留在内存的对象仍然在使用,拷贝之后就有两份对象在内存中,所以增加了一倍内存)。而且,在高并发的场景下,因为每个线程都拷贝一份对象在内存中,这种情况体现得更明显。由于JVM的优化机制,将会触发频繁的Young GC和Full GC,从而使整个系统的性能下降。

数据一致性问题

CopyOnWriteArrayList不能保证实时一致性,因为读线程在将引用重新指向原来的对象之前再次读到的数据是旧的,所以CopyOnWriteArrayList 只能保证最终一致性。因此在需要实时一致性的场景CopyOnWriteArrayList是不能使用的。

CopyOnWriteArrayList小结

  1. CopyOnWriteArrayList适用于读多写少的场景
  2. 在并发操作容器对象时不会抛出ConcurrentModificationException,并且返回的元素与迭代器创建时的元素是一致的
  3. 容器对象的复制需要一定的开销,如果对象占用内存过大,可能造成频繁的YoungGC和Full GC
  4. CopyOnWriteArrayList不能保证数据实时一致性,只能保证最终一致性

CopyOnWriteArraySet

基于CopyOnWriteArrayList实现,其唯一的不同是在add时调用的是CopyOnWriteArrayList的addIfAbsent方法,其遍历当前Object数组,如Object数组中已有了当前元素,则直接返回,如果没有则放入Object数组的尾部,并返回。

 

借鉴文章:

http://www.cnblogs.com/skywang12345/p/3498634.html 

https://blog.****.net/hanchao5272/article/details/79859087

https://blog.****.net/u011080472/article/details/51419001