Netty源码分析(二)之NioEventLoop
文章目录
netty版本
- 使用的netty版本是
io.netty:netty-all:4.1.33.Final
NioEventLoop
- uml图
-
NioEventLoop
是一个无限循环(Loop
),在循环中不断处理接收到的事件
NioEventLoop 构造方法分析
-
构造方法,根据
NioEventLoopGroup
源码的分析可知,NioEventLoop
的构造方法调用是在newChild()
(NioEventLoopGroup
实现类的newChild
方法)时。构造方法执行顺序
sequenceDiagram
NioEventLoop ->> SingleThreadEventLoop : 调用父类构造方法
SingleThreadEventLoop ->> SingleThreadEventExecutor : 调用父类构造方法
SingleThreadEventExecutor ->> AbstractScheduledEventExecutor : 调用父类构造方法
AbstractScheduledEventExecutor ->> AbstractEventExecutor : 调用父类构造方法
SingleThreadEventLoop ->> SingleThreadEventLoop : openSelector 创建一个Selectorjava
/**
* @param parent 该NioEventLoop所在的线程池
* @param executor
* @param selectorProvider 用于创建selector
* @param strategy
* @param rejectedExecutionHandler
*/
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler) {
//DEFAULT_MAX_PENDING_TASKS 队列最大待处理任务
super(parent, executor, false, DEFAULT_MAX_PENDING_TASKS, rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException(“selectorProvider”);
}
if (strategy == null) {
throw new NullPointerException(“selectStrategy”);
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
//SelectedSelectionKeySetSelector对象
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
```
SingleThreadEventLoop构造方法
-
调用的构造方法,
SingleThreadEventLoop
中newTaskQueue
方法使用的是LinkedBlockingQueue
队列,子类NioEventLoop
使用的是MpsQueue
protected SingleThreadEventLoop(EventLoopGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedExecutionHandler) { super(parent, executor, addTaskWakesUp, maxPendingTasks, rejectedExecutionHandler); tailTasks = newTaskQueue(maxPendingTasks); } //父类SingleThreadEventExecutor protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { return new LinkedBlockingQueue<Runnable>(maxPendingTasks); }
-
NioEventLoop
的newTaskQueue
方法@Override protected Queue<Runnable> newTaskQueue(int maxPendingTasks) { // This event loop never calls takeTask() return maxPendingTasks == Integer.MAX_VALUE ? PlatformDependent.<Runnable>newMpscQueue() : PlatformDependent.<Runnable>newMpscQueue(maxPendingTasks); }
SingleThreadEventExecutor构造方法
-
调用的构造方法
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor, boolean addTaskWakesUp, int maxPendingTasks, RejectedExecutionHandler rejectedHandler) { super(parent); this.addTaskWakesUp = addTaskWakesUp; this.maxPendingTasks = Math.max(16, maxPendingTasks); this.executor = ObjectUtil.checkNotNull(executor, "executor"); //存放任务的 taskQueue = newTaskQueue(this.maxPendingTasks); rejectedExecutionHandler = ObjectUtil.checkNotNull(rejectedHandler, "rejectedHandler"); }
openSelector
-
如果没有开启优化属性,则直接返回,
private SelectorTuple openSelector() { final Selector unwrappedSelector; try { unwrappedSelector = provider.openSelector(); } catch (IOException e) { throw new ChannelException("failed to open a new selector", e); } //如果没有开启KEYSET优化,Jdk创建的Selector直接返回 if (DISABLE_KEYSET_OPTIMIZATION) { return new SelectorTuple(unwrappedSelector); } //AccessController.doPrivileged表示不用做权限检查 Object maybeSelectorImplClass = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { return Class.forName( "sun.nio.ch.SelectorImpl", false, PlatformDependent.getSystemClassLoader()); } catch (Throwable cause) { return cause; } } }); if (!(maybeSelectorImplClass instanceof Class) || // ensure the current selector implementation is what we can instrument. !((Class<?>) maybeSelectorImplClass).isAssignableFrom(unwrappedSelector.getClass())) { if (maybeSelectorImplClass instanceof Throwable) { Throwable t = (Throwable) maybeSelectorImplClass; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, t); } return new SelectorTuple(unwrappedSelector); } final Class<?> selectorImplClass = (Class<?>) maybeSelectorImplClass; //创建一个存放SelectionKey的对象,本质上是一个一维数组 final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet(); Object maybeException = AccessController.doPrivileged(new PrivilegedAction<Object>() { @Override public Object run() { try { Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys"); Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys"); Throwable cause = ReflectionUtil.trySetAccessible(selectedKeysField, true); if (cause != null) { return cause; } cause = ReflectionUtil.trySetAccessible(publicSelectedKeysField, true); if (cause != null) { return cause; } //反射强制将SelectorImpl中的selectedKeys替换为优化版的SelectedSelectionKeySet对象 selectedKeysField.set(unwrappedSelector, selectedKeySet); //反射强制将SelectorImpl中的publicSelectedKeys替换为优化版的SelectedSelectionKeySet对象 publicSelectedKeysField.set(unwrappedSelector, selectedKeySet); return null; } catch (NoSuchFieldException e) { return e; } catch (IllegalAccessException e) { return e; } } }); if (maybeException instanceof Exception) { selectedKeys = null; Exception e = (Exception) maybeException; logger.trace("failed to instrument a special java.util.Set into: {}", unwrappedSelector, e); return new SelectorTuple(unwrappedSelector); } selectedKeys = selectedKeySet; logger.trace("instrumented a special java.util.Set into: {}", unwrappedSelector); //SelectorTuple是一个封装了原始selector对象和封装后selector对象(即,SelectedSelectionKeySetSelector对象)的类 return new SelectorTuple(unwrappedSelector, new SelectedSelectionKeySetSelector(unwrappedSelector, selectedKeySet)); } private static final class SelectorTuple { final Selector unwrappedSelector; final Selector selector; SelectorTuple(Selector unwrappedSelector) { this.unwrappedSelector = unwrappedSelector; this.selector = unwrappedSelector; } SelectorTuple(Selector unwrappedSelector, Selector selector) { this.unwrappedSelector = unwrappedSelector; this.selector = selector; } }
SelectedSelectionKeySet
-
如果开启优化(
-Dio.netty.noKeySetOptimization
),netty反射强制将SelectorImpl
中的selectedKeys
替换为优化版的SelectedSelectionKeySet
对象,这使Selector
中所有对selectedKeys
、publicSelectedKeys
的操作实际上就是对SelectedSelectionKeySet
的操作。SelectedSelectionKeySet
本质是一个一维动态数组,每次扩充1倍容量final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> { SelectionKey[] keys; int size; SelectedSelectionKeySet() { keys = new SelectionKey[1024]; } @Override public boolean add(SelectionKey o) { if (o == null) { return false; } keys[size++] = o; if (size == keys.length) { increaseCapacity(); } return true; } @Override public int size() { return size; } @Override public boolean remove(Object o) { return false; } @Override public boolean contains(Object o) { return false; } @Override public Iterator<SelectionKey> iterator() { throw new UnsupportedOperationException(); } void reset() { reset(0); } void reset(int start) { Arrays.fill(keys, start, size, null); size = 0; } //扩大一倍,从1024变为2048 private void increaseCapacity() { SelectionKey[] newKeys = new SelectionKey[keys.length << 1]; System.arraycopy(keys, 0, newKeys, 0, size); keys = newKeys; } }
JDK Bug解决方案
-
Selector.open()
空指针bug:sun.nio.ch.Util
中包含一段非线程安全的代码,可能会抛出NullPointerException
异常。线程1
执行到⑤,未执行到⑥,线程2
执行了②,此时bugLevel
可能为null,即bugLevel
被重置为null,导致线程1
执行到⑥时抛出NullPointerException
异常,此bug详细参见bug库https://bugs.java.com/view_bug.do?bug_id=6427854
sun.nio.ch.Util contains code which is not thread safe and can throw a NullPointerException: private static String bugLevel = null; static boolean atBugLevel(String bl) { // package-private if (bugLevel == null) { ① if (!sun.misc.VM.isBooted()) return false; java.security.PrivilegedAction pa = new GetPropertyAction("sun.nio.ch.bugLevel"); // the next line can reset bugLevel to null bugLevel = (String)AccessController.doPrivileged(pa);② if (bugLevel == null)③ bugLevel = "";④ } return (bugLevel != null) ⑤&& bugLevel.equals(bl)⑥; }
-
使用
linux内核2.4
并且同时使用jdk6u4
或者jdk7b12
以下版本,epoll-bug会导致JDK中的Selector.select()
在没有感兴趣的事件发生时,无效的被调用。此时本应该阻塞的Selector.select()
确没有阻塞,而是返回0,导致CPU空轮询,致使IO线程CPU 100%.bug库地址https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6403933
-
netty中的解决方案
-
Selector.open()
空指针问题:在Selector.open()
之前,如果sun.nio.ch.bugLevel
为空,先将sun.nio.ch.bugLevel
设置为空字符串 -
epoll 空轮询导致CPU 100%问题
:对select()
返回0的操作计数(JDK bug导致select()
不阻塞而返回0),如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD
就新建一个selector
,将注册到老的selector
上的channel
重新注册到新的selector
上。通过-Dio.netty.selectorAutoRebuildThreshold
设置系统变量设置值,默认阈值512,如果设置的阈值小于MIN_PREMATURE_SELECTOR_RETURNS=3
,则设置为0
static { /** * 解决`Selector.open()`空指针bug,在`Selector.open()`之前,如果`sun.nio.ch.bugLevel`为空, * 先将`sun.nio.ch.bugLevel`设置为空字符串 */ final String key = "sun.nio.ch.bugLevel"; final String buglevel = SystemPropertyUtil.get(key); if (buglevel == null) { try { AccessController.doPrivileged(new PrivilegedAction<Void>() { @Override public Void run() { System.setProperty(key, ""); return null; } }); } catch (final SecurityException e) { logger.debug("Unable to get/set System Property: " + key, e); } } /** * 对select返回0的操作计数(JDK bug导致select()不阻塞而返回0),如果次数大于阈值SELECTOR_AUTO_REBUILD_THRESHOLD就新建一个selector, * 将注册到老的selector上的channel重新注册到新的selector上。通过-Dio.netty.selectorAutoRebuildThreshold设置系统变量设置值 * 默认阈值512,如果设置的阈值小于MIN_PREMATURE_SELECTOR_RETURNS=3,则设置为0 */ int selectorAutoRebuildThreshold = SystemPropertyUtil.getInt("io.netty.selectorAutoRebuildThreshold", 512); if (selectorAutoRebuildThreshold < MIN_PREMATURE_SELECTOR_RETURNS) { selectorAutoRebuildThreshold = 0; } SELECTOR_AUTO_REBUILD_THRESHOLD = selectorAutoRebuildThreshold; if (logger.isDebugEnabled()) { logger.debug("-Dio.netty.noKeySetOptimization: {}", DISABLE_KEYSET_OPTIMIZATION); logger.debug("-Dio.netty.selectorAutoRebuildThreshold: {}", SELECTOR_AUTO_REBUILD_THRESHOLD); } }
-
run方法
-
run
@Override protected void run() { for (;;) { try { try { switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) { case SelectStrategy.CONTINUE: continue; case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO case SelectStrategy.SELECT: select(wakenUp.getAndSet(false)); // 'wakenUp.compareAndSet(false, true)' is always evaluated // before calling 'selector.wakeup()' to reduce the wake-up // overhead. (Selector.wakeup() is an expensive operation.) // // However, there is a race condition in this approach. // The race condition is triggered when 'wakenUp' is set to // true too early. // // 'wakenUp' is set to true too early if: // 1) Selector is waken up between 'wakenUp.set(false)' and // 'selector.select(...)'. (BAD) // 2) Selector is waken up between 'selector.select(...)' and // 'if (wakenUp.get()) { ... }'. (OK) // // In the first case, 'wakenUp' is set to true and the // following 'selector.select(...)' will wake up immediately. // Until 'wakenUp' is set to false again in the next round, // 'wakenUp.compareAndSet(false, true)' will fail, and therefore // any attempt to wake up the Selector will fail, too, causing // the following 'selector.select(...)' call to block // unnecessarily. // // To fix this problem, we wake up the selector again if wakenUp // is true immediately after selector.select(...). // It is inefficient in that it wakes up the selector for both // the first case (BAD - wake-up required) and the second case // (OK - no wake-up required). if (wakenUp.get()) { selector.wakeup(); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); handleLoopException(e); continue; } cancelledKeys = 0; needsToSelectAgain = false; final int ioRatio = this.ioRatio; if (ioRatio == 100) { try { processSelectedKeys(); } finally { // Ensure we always run tasks. runAllTasks(); } } else { final long ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } } catch (Throwable t) { handleLoopException(t); } // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Throwable t) { handleLoopException(t); } } }
processSelectedKeys
select()方法
rebuildSelector
-
rebuildSelector
的过程- 创建一个新的
Selector
- 取消
Channel
在OldSelector
上的SelectionKey
- 将
OldSelector
上的所有Channel
重新注册到新的Selector
上 - 关闭
OldSelector
释放资源
- 创建一个新的
-
rebuildSelector
源码private void rebuildSelector0() { final Selector oldSelector = selector; final SelectorTuple newSelectorTuple; if (oldSelector == null) { return; } try { newSelectorTuple = openSelector(); } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; } // Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) { Object a = key.attachment(); try { //keyFor 获取当前Channel注册在指定Selector上的键,如果没有注册将返回null if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //取消SelectionKey在OldSelector上的事件注册 int interestOps = key.interestOps(); key.cancel(); //Channel注册在新的selector上 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) { // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector; try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } } if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }