深入学习线程局部变量ThreadLocal

ThreadLocal是线程局部变量,和普通变量的不同在于:每个线程持有这个变量的一个副本,可以独立修改(set方法)和访问(get方法)这个变量,并且线程之间不会发生冲突。

类中定义的ThreadLocal实例一般会被private static修饰,这样可以让ThreadLocal实例的状态和Thread绑定在一起。业务上,一般用ThreadLocal包装一些业务ID(user ID或事务ID)——不同的线程使用的ID是不相同的。

【1】如何使用ThreadLocal

① 包装SimpleDateFormat

SimpleDateFormat是非线程安全的,多线程操作下会出现异常。测试代码如下:

public static void main(String[] args) throws Exception {
		
	SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMdd");
	
	Callable<Date> task = new Callable<Date>() {

		@Override
		public Date call() throws Exception {
			return sdf.parse("20181121");
		}
		
	};
	//创建固定大小线程池
	ExecutorService pool = Executors.newFixedThreadPool(10);
	
	List<Future<Date>> results = new ArrayList<>();
	
	for (int i = 0; i < 10; i++) {
		results.add(pool.submit(task));
	}
	//打印结果
	for (Future<Date> future : results) {
		System.out.println(future.get());
	}
	//关闭线程池
	pool.shutdown();
}

测试结果如下:
深入学习线程局部变量ThreadLocal
这里就可以使用ThreadLocal:

public class DateFormatThreadLocal {
	
	private static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
		protected DateFormat initialValue(){
			return new SimpleDateFormat("yyyyMMdd");
		}
	};

	public static final Date convert(String source) throws ParseException{
		return df.get().parse(source);
	}
}

注意,这里针对每个线程只需要初始化一次SimpleDateFormat对象,其实跟在自定义线程中定义一个SimpleDateFormat成员变量,并在线程初始化的时候new这个对象,效果是一样的,只是这样看起来代码更规整。


② 自定义MyRunnable被两个线程访问

下面这个例子,我们定义了一个MyRunnable对象,这个MyRunnable对象会被线程1和线程2使用,但是通过内部的ThreadLocal变量,每个线程访问到的整数都是自己单独的一份。

package org.java.learn.concurrent.threadlocal;

public class ThreadLocalExample {
   public static class MyRunnable implements Runnable {

       private ThreadLocal<Integer> threadLocal =
               new ThreadLocal<Integer>();

       @Override
       public void run() {
           threadLocal.set((int) (Math.random() * 100D));

           try {
               Thread.sleep(2000);
           } catch (InterruptedException e) {
           }

           System.out.println(threadLocal.get());
       }
   }


   public static void main(String[] args) throws InterruptedException {
       MyRunnable sharedRunnableInstance = new MyRunnable();

       Thread thread1 = new Thread(sharedRunnableInstance);
       Thread thread2 = new Thread(sharedRunnableInstance);

       thread1.start();
       thread2.start();

       thread1.join(); //wait for thread 1 to terminate
       thread2.join(); //wait for thread 2 to terminate
   }
}

【2】ThreadLocal源码分析

ThreadLocal是如何被线程使用的?原理如下图所示:Thread引用和ThreadLocal引用都在栈上,Thread引用会引用一个ThreadLocalMap对象,这个map中的key是ThreadLocal对象(使用WeakReference包装),value是业务上变量的值。
深入学习线程局部变量ThreadLocal
首先看java.lang.Thread中的代码:

public
class Thread implements Runnable {
   //......其他源码
   /* ThreadLocal values pertaining to this thread. This map is maintained by the ThreadLocal class. */
   ThreadLocal.ThreadLocalMap threadLocals = null;

   /*
    * InheritableThreadLocal values pertaining to this thread. This map is maintained by the InheritableThreadLocal class.
    */
   ThreadLocal.ThreadLocalMap inheritableThreadLocals = null;
   //......其他源码

Thread中的threadLocals变量指向的是一个map,这个map就是ThreadLocal.ThreadLocalMap,里面存放的是跟当前线程绑定的ThreadLocal变量。inheritableThreadLocals的作用相同,里面也是存放的ThreadLocal变量,但是存放的是从当前线程的父线程继承过来的ThreadLocal变量。

再看java.lang.ThreadLocal类,主要的成员和接口如下:
深入学习线程局部变量ThreadLocal

① withInitial方法

Java 8以后用于初始化ThreadLocal的一种方法,在外部调用get()方法的时候,会通过Supplier确定变量的初始值:

public static <S> ThreadLocal<S> withInitial(Supplier<? extends S> supplier) {
    return new SuppliedThreadLocal<>(supplier);
  }

② get方法

获取当前线程的变量副本,如果当前线程还没有创建该变量的副本,则需要通过调用initialValue方法来设置初始值。

get方法的源代码如下,首先通过当前线程获取当前线程对应的map。如果map不为空,则从map中取出对应的Entry,然后取出对应的值。如果map为空,则调用setInitialValue设置初始值。如果map不为空,当前ThreadLocal实例对应的Entry为空,则也需要设置初始值。

public T get() {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null) {
            ThreadLocalMap.Entry e = map.getEntry(this);
            if (e != null) {
                @SuppressWarnings("unchecked")
                T result = (T)e.value;
                return result;
            }
        }
        return setInitialValue();
    }

其中Entry是ThreadLocal静态内部类ThreadLocalMap的静态内部类:

 static class ThreadLocalMap {

        /**
         * The entries in this hash map extend WeakReference, using
         * its main ref field as the key (which is always a
         * ThreadLocal object).  Note that null keys (i.e. entry.get()
         * == null) mean that the key is no longer referenced, so the
         * entry can be expunged from table.  Such entries are referred to
         * as "stale entries" in the code that follows.
         */
        static class Entry extends WeakReference<ThreadLocal<?>> {
            /** The value associated with this ThreadLocal. */
            Object value;

            Entry(ThreadLocal<?> k, Object v) {
                super(k);
                value = v;
            }
        }
        //...
 }

③ set方法

set方法跟get方法一样,先获取当前线程对应的map。如果map为空,则调用createMap创建map,否则将变量的值放入map——key为当前这个ThreadLocal对象,value为变量的值。

 public void set(T value) {
        Thread t = Thread.currentThread();
        ThreadLocalMap map = getMap(t);
        if (map != null)
            map.set(this, value);
        else
            createMap(t, value);
    }

createMap方法如下:

void createMap(Thread t, T firstValue) {
        t.threadLocals = new ThreadLocalMap(this, firstValue);
}

④ remove方法,删除当前线程绑定的这个副本

public void remove() {
         ThreadLocalMap m = getMap(Thread.currentThread());
         if (m != null)
             m.remove(this);
     }

ThreadLocalMap.remove方法如下:

 /**
    * Remove the entry for key.
    */
   private void remove(ThreadLocal<?> key) {
       Entry[] tab = table;
       int len = tab.length;
       int i = key.threadLocalHashCode & (len-1);
       for (Entry e = tab[i];
            e != null;
            e = tab[i = nextIndex(i, len)]) {
           if (e.get() == key) {
               e.clear();
               expungeStaleEntry(i);
               return;
           }
       }
   }

⑤ ThreadLocalMap构造函数

ThreadLocalMap中有一个Entry[] table,其中维护了一系列的entry。table初始化容量为16。

ThreadLocalMap(ThreadLocal<?> firstKey, Object firstValue) {
            table = new Entry[INITIAL_CAPACITY];
            int i = firstKey.threadLocalHashCode & (INITIAL_CAPACITY - 1);
            table[i] = new Entry(firstKey, firstValue);
            size = 1;
            setThreshold(INITIAL_CAPACITY);
        }

⑥ private static final int HASH_INCREMENT = 0x61c88647;

这个值是HASH_INCREMENT的值,普通的hashmap是使用链表来处理冲突的,但是ThreadLocalMap是使用线性探测法来处理冲突的,HASH_INCREMENT就是每次增加的步长,根据参考资料所说,选择这个数字是为了让冲突概率最小。

 /**
     * The difference between successively generated hash codes - turns
     * implicit sequential thread-local IDs into near-optimally spread
     * multiplicative hash values for power-of-two-sized tables.
     */
    private static final int HASH_INCREMENT = 0x61c88647;

【3】父子进程数据共享

InheritableThreadLocal主要用于子线程创建时,需要自动继承父线程的ThreadLocal变量,实现子线程访问父线程的threadlocal变量。InheritableThreadLocal继承了ThreadLocal,并重写了childValue、getMap、createMap三个方法。

public class InheritableThreadLocal<T> extends ThreadLocal<T> {
    /**
     * Computes the child's initial value for this inheritable thread-local
     * variable as a function of the parent's value at the time the child
     * thread is created.  This method is called from within the parent
     * thread before the child is started.
     * <p>
     * This method merely returns its input argument, and should be overridden
     * if a different behavior is desired.
     *
     * @param parentValue the parent thread's value
     * @return the child thread's initial value
     */
     //**
    * 创建线程的时候,如果需要继承且父线程中Thread-Local变量,则需要将父线程中的ThreadLocal变量一次拷贝过来。
    */
    protected T childValue(T parentValue) {
        return parentValue;
    }

     /**
   * 由于重写了getMap,所以在操作InheritableThreadLocal变量的时候,将只操作Thread类中的inheritableThreadLocals变量,与threadLocals变量没有关系
   **/
    ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

    /**
    * 跟getMap类似,set或getInheritableThreadLocal变量的时候,将只操作Thread类中的inheritableThreadLocals变量
    */
    void createMap(Thread t, T firstValue) {
        t.inheritableThreadLocals = new ThreadLocalMap(this, firstValue);
    }
}

关于childValue多说两句,拷贝是如何发生的?首先看Thread.init方法:

private void init(ThreadGroup g, Runnable target, String name,
                      long stackSize, AccessControlContext acc) {
        if (name == null) {
            throw new NullPointerException("name cannot be null");
        }

        this.name = name.toCharArray();

        Thread parent = currentThread();
        SecurityManager security = System.getSecurityManager();
        if (g == null) {
            /* Determine if it's an applet or not */

            /* If there is a security manager, ask the security manager
               what to do. */
            if (security != null) {
                g = security.getThreadGroup();
            }

            /* If the security doesn't have a strong opinion of the matter
               use the parent thread group. */
            if (g == null) {
                g = parent.getThreadGroup();
            }
        }

        /* checkAccess regardless of whether or not threadgroup is
           explicitly passed in. */
        g.checkAccess();

        /*
         * Do we have the required permissions?
         */
        if (security != null) {
            if (isCCLOverridden(getClass())) {
                security.checkPermission(SUBCLASS_IMPLEMENTATION_PERMISSION);
            }
        }

        g.addUnstarted();

        this.group = g;
        this.daemon = parent.isDaemon();
        this.priority = parent.getPriority();
        if (security == null || isCCLOverridden(parent.getClass()))
            this.contextClassLoader = parent.getContextClassLoader();
        else
            this.contextClassLoader = parent.contextClassLoader;
        this.inheritedAccessControlContext =
                acc != null ? acc : AccessController.getContext();
        this.target = target;
        setPriority(priority);
        if (parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
        /* Stash the specified stack size in case the VM cares */
        this.stackSize = stackSize;

        /* Set thread ID */
        tid = nextThreadID();
    }

然后看ThreadLocal.createInheritedMap方法,最终会调用到newThreadLocalMap方法,这里InheritableThreadLocal对childValue做了重写,可以看出,这里确实是将父线程关联的ThreadLocalMap中的内容依次拷贝到子线程的ThreadLocalMap中了。

static ThreadLocalMap createInheritedMap(ThreadLocalMap parentMap) {
        return new ThreadLocalMap(parentMap);
    }
 /**
         * Construct a new map including all Inheritable ThreadLocals
         * from given parent map. Called only by createInheritedMap.
         *
         * @param parentMap the map associated with parent thread.
         */
        private ThreadLocalMap(ThreadLocalMap parentMap) {
            Entry[] parentTable = parentMap.table;
            int len = parentTable.length;
            setThreshold(len);
            table = new Entry[len];

            for (int j = 0; j < len; j++) {
                Entry e = parentTable[j];
                if (e != null) {
                    @SuppressWarnings("unchecked")
                    ThreadLocal<Object> key = (ThreadLocal<Object>) e.get();
                    if (key != null) {
                        Object value = key.childValue(e.value);
                        Entry c = new Entry(key, value);
                        int h = key.threadLocalHashCode & (len - 1);
                        while (table[h] != null)
                            h = nextIndex(h, len);
                        table[h] = c;
                        size++;
                    }
                }
            }
        }

【4】ThreadLocal垃圾回收和实际应用

① ThreadLocal对象何时被回收?

ThreadLocalMap中的key是ThreadLocal对象,然后ThreadLocal对象时被WeakReference包装的,这样当没有强引用指向该ThreadLocal对象之后,或者说Map中的ThreadLocal对象被判定为弱引用可达时,就会在垃圾收集中被回收掉。看下Entry的定义:

static class Entry extends WeakReference<ThreadLocal<?>> {
    /** The value associated with this ThreadLocal. */
    Object value;

    Entry(ThreadLocal<?> k, Object v) {
        super(k);
        value = v;
    }
}

② ThreadLocal和线程池一起使用会有哪些问题?

ThreadLocal对象的生命周期跟线程的生命周期一样长,那么如果将ThreadLocal对象和线程池一起使用,就可能会遇到这种情况:一个线程的ThreadLocal对象会和其他线程的ThreadLocal对象串掉,一般不建议将两者一起使用。

③ Dubbo中对ThreadLocal的使用

从Dubbo中找到了ThreadLocal的例子,它主要是用在请求缓存的场景,具体代码如下:

@Activate(group = {Constants.CONSUMER, Constants.PROVIDER}, value = Constants.CACHE_KEY)
public class CacheFilter implements Filter {

   private CacheFactory cacheFactory;

   public void setCacheFactory(CacheFactory cacheFactory) {
       this.cacheFactory = cacheFactory;
   }

   @Override
   public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
       if (cacheFactory != null && ConfigUtils.isNotEmpty(invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.CACHE_KEY))) {
           Cache cache = cacheFactory.getCache(invoker.getUrl(), invocation);
           if (cache != null) {
               String key = StringUtils.toArgumentString(invocation.getArguments());
               Object value = cache.get(key);
               if (value != null) {
                   if (value instanceof ValueWrapper) {
                       return new RpcResult(((ValueWrapper)value).get());
                   } else {
                       return new RpcResult(value);
                   }
               }
               Result result = invoker.invoke(invocation);
               if (!result.hasException()) {
                   cache.put(key, new ValueWrapper(result.getValue()));
               }
               return result;
           }
       }
       return invoker.invoke(invocation);
   }

可以看出,在RPC调用(invoke)的链路上,会先使用请求参数判断当前线程是否刚刚发起过同样参数的调用——这个调用会使用ThreadLocalCache保存起来。具体的看ThreadLocalCache的实现如下:

package org.apache.dubbo.cache.support.threadlocal;

import org.apache.dubbo.cache.Cache;
import org.apache.dubbo.common.URL;

import java.util.HashMap;
import java.util.Map;

/**
* ThreadLocalCache
*/
public class ThreadLocalCache implements Cache {

   //ThreadLocal里存放的是参数到结果的映射
   private final ThreadLocal<Map<Object, Object>> store;

   public ThreadLocalCache(URL url) {
       this.store = new ThreadLocal<Map<Object, Object>>() {
           @Override
           protected Map<Object, Object> initialValue() {
               return new HashMap<Object, Object>();
           }
       };
   }

   @Override
   public void put(Object key, Object value) {
       store.get().put(key, value);
   }

   @Override
   public Object get(Object key) {
       return store.get().get(key);
   }

}

④ RocketMQ中ThreadLocal使用

在RocketMQ中,也找到了ThreadLocal的身影,它是用在消息发送的场景。MQClientAPIImpl是RMQ中负责将消息发送到服务端的实现,其中有一个步骤需要选择一个具体的队列。选择具体的队列的时候,不同的线程有自己负责的index值,这里使用了ThreadLocal的机制。

可以看下ThreadLocalIndex的实现:

package org.apache.rocketmq.client.common;

import java.util.Random;

public class ThreadLocalIndex {
   private final ThreadLocal<Integer> threadLocalIndex = new ThreadLocal<Integer>();
   private final Random random = new Random();

   public int getAndIncrement() {
       Integer index = this.threadLocalIndex.get();
       if (null == index) {
           index = Math.abs(random.nextInt());
           if (index < 0)
               index = 0;
           this.threadLocalIndex.set(index);
       }

       index = Math.abs(index + 1);
       if (index < 0)
           index = 0;

       this.threadLocalIndex.set(index);
       return index;
   }

   @Override
   public String toString() {
       return "ThreadLocalIndex{" +
           "threadLocalIndex=" + threadLocalIndex.get() +
           '}';
   }
}

参考博文:
Java虚拟机的垃圾回收处理与算法
ThreadLocal详解