netty源码解析(三)recycler
recycler类是netty中对象的缓存利用的类,
一、普通用法如下:
public class RecyclerTest {
private static final Recycler<User> RECYCLER = new Recycler<User>() {
@Override
protected User newObject(Handle<User> handle) {
return new User(handle);
}
};
private static class User{
private final Recycler.Handle<User> handle;
public User(Recycler.Handle<User> handle) {
this.handle = handle;
}
public void recycle() {
handle.recycle(this);
}
}
public static void main(String[] args) {
User user = RECYCLER.get();
user.recycle();
User user1 = RECYCLER.get();
System.out.println(user == user1);
}
}
控制台输出结果中,main方法打印出来的user和user1是相等的。
二、PooledDirectByteBuf中使用到了recycler
final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
private static final Recycler<PooledDirectByteBuf> RECYCLER = new Recycler<PooledDirectByteBuf>() {
@Override
protected PooledDirectByteBuf newObject(Handle<PooledDirectByteBuf> handle) {
return new PooledDirectByteBuf(handle, 0);
}
};
static PooledDirectByteBuf newInstance(int maxCapacity) {
PooledDirectByteBuf buf = RECYCLER.get();
buf.reuse(maxCapacity);
return buf;
}
在上面的newInstance方法中,取出来PooledDirectByteBuf进行了重复利用。其中buf.reuse就是初始化了一些值。
三、Recycler
我们进入Recycler的构造方法
protected Recycler() {
this(DEFAULT_MAX_CAPACITY_PER_THREAD);
}
在说其构造方法之前,我们先了解一下Recycler中的属性:
private final FastThreadLocal<Stack<T>> threadLocal = new FastThreadLocal<Stack<T>>() {
@Override
protected Stack<T> initialValue() {
return new Stack<T>(Recycler.this, Thread.currentThread(), maxCapacityPerThread, maxSharedCapacityFactor,
ratioMask, maxDelayedQueuesPerThread);
}
@Override
protected void onRemoval(Stack<T> value) {
// Let us remove the WeakOrderQueue from the WeakHashMap directly if its safe to remove some overhead
if (value.threadRef.get() == Thread.currentThread()) {
if (DELAYED_RECYCLED.isSet()) {
DELAYED_RECYCLED.get().remove(value);
}
}
}
};
里面有一个FastThreadLocal属性,里面有一个Stack类型,说明对于每一个线程,都会有一个Stack对象。
new Stack方法中有几个参数:
Recycler.this:是本Recycler对象。
Thread.currentThread():是当前线程,说明每一个Stack与当前线程进行绑定,
后面的一些参数我们进入Stack方法中看下:
Stack(Recycler<T> parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues) {
this.parent = parent;
threadRef = new WeakReference<Thread>(thread);
this.maxCapacity = maxCapacity;
availableSharedCapacity = new AtomicInteger(max(maxCapacity / maxSharedCapacityFactor, LINK_CAPACITY));
elements = new DefaultHandle[min(INITIAL_CAPACITY, maxCapacity)];
this.ratioMask = ratioMask;
this.maxDelayedQueues = maxDelayedQueues;
}
其中有一个elements属性,elements属性是DefaultHandle类型的数组,每个Handle都包含了一个对象,这些对象可以被回收与引用。
ratioMask:用来控制对象回收的频率,并不是每次对象都会被回收的,通过ratioMask来控制对象回收的比率
maxCapacity:表示Handle数组的最大的大小。
maxDelayedQueues:线程1创建的对象,可能被线程2释放,线程2释放的时候会创建一个WeakOrderQueue,WeakOrderQueue中就存放着线程1中创建的对象,到线程2中去释放,后续分析再详细讲解。这个参数就表示线程1创建的对象,能够释放的线程数有多少。
在Stack中还有head,prev,cusor表示WeakOrderQueue的一系列数据结构。
availableSharedCapacity:表示现诚意创建的对象能够在其他线程中缓存的最大个数,默认值为16K,即本线程可以缓存32k个对象,本线程创建的对象可以被其他线程缓存16K个。
回到Recycler的初始化方法,里面的初始化参数的初始值DEFAULT_MAX_CAPACITY_PER_THREAD=32K,说明每一个Stack中最多能够存放32K个对象。
protected Recycler(int maxCapacityPerThread) {
this(maxCapacityPerThread, MAX_SHARED_CAPACITY_FACTOR);
}
MAX_SHARED_CAPACITY_FACTOR:默认值是2
protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor) {
this(maxCapacityPerThread, maxSharedCapacityFactor, RATIO, MAX_DELAYED_QUEUES_PER_THREAD);
}
RATIO的默认值是8,MAX_DELAYED_QUEUES_PER_THREAD默认值是2倍的cpu核数。
protected Recycler(int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread) {
ratioMask = safeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0) {
this.maxCapacityPerThread = 0;
this.maxSharedCapacityFactor = 1;
this.maxDelayedQueuesPerThread = 0;
} else {
this.maxCapacityPerThread = maxCapacityPerThread;
this.maxSharedCapacityFactor = max(1, maxSharedCapacityFactor);
this.maxDelayedQueuesPerThread = max(0, maxDelayedQueuesPerThread);
}
}
ratioMask为ratio的值-1,即为7。
Stack的属性结构,图示如下:
四、Recycler获取对象的方式
1、获取当前线程的Stack
2、从Stack里面弹出对象
3、如果对象不为空直接弹出对象,如果为空,创建对象并绑定
获取对象在Recycler的get()方法中:
public final T get() {
if (maxCapacityPerThread == 0) {
return newObject((Handle<T>) NOOP_HANDLE);
}
Stack<T> stack = threadLocal.get();
DefaultHandle<T> handle = stack.pop();
if (handle == null) {
handle = stack.newHandle();
handle.value = newObject(handle);
}
return (T) handle.value;
}
五、回收对象到Recycler
1、同线程回收对象
Recycler的recycle方法会调用Handler的recycle方法,然后调用Stack的push()方法,回收对象
@Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
}
stack.push(this);
}
void push(DefaultHandle<?> item) {
Thread currentThread = Thread.currentThread();
if (threadRef.get() == currentThread) {
// The current Thread is the thread that belongs to the Stack, we can try to push the object now.
pushNow(item);
} else {
// The current Thread is not the one that belongs to the Stack
// (or the Thread that belonged to the Stack was collected already), we need to signal that the push
// happens later.
pushLater(item, currentThread);
}
}
上面方法会判断一下,如果创建Stack的线程是本线程,则使用pushNow方法回收,如果创建Stack的线程不是本线程,则使用pushLater回收。
2、异线程回收对象
(1)获取WeadOrderQueue
(2)如果是第一次回收,创建WeakOrderQueue,并与原来线程的WeakOrderQueue进行关联
(3)将对象追加到WeakOrderQueue.
六、异线程收割对象
在Stack类的pop()方法中的scavenge()方法中
DefaultHandle<T> pop() {
int size = this.size;
if (size == 0) {
if (!scavenge()) {