netty5笔记-线程模型1-Promise
一起来学习下netty的线程池实现。 我们知道java本身实现了一套线程池,即我们常见的ExecutorService。那么netty为什么还要定义自己的线程模型,什么时候适合用netty线程池,什么时候适合用ExecutorService。相信你看了这几篇文章就会有眉目。
先来张大图,直观的看下netty线程模型和java自带线程池的区别:
需要注意,此图对线程模型进行了简化。图左netty中的executor指jdk中的Executor接口。
了解Promise前我们先看看Future。Future代表一个异步任务的结果,由于是异步任务,得到Future并不代表任务结束,你可以通过get来等待真正结果的返回,或者通过cancal来取消任务。netty对java.util.concurrent.Future进行了增强:
方法名 | 说明 |
---|---|
isSuccess | 任务是否执行成功 |
isCancellable | 任务是否可以取消 |
cause | 任务产生的异常 |
addListener | 添加listener, 任务完成后执行listener,如果任务已经完成,则添加时立刻执行 |
removeListener | 移除listener |
sync | 等待任务结束,如果任务产生异常或被中断则抛出异常,否则返回Future自身 |
syncUninterruptibly | 等待任务结束,任务本身不可中断,如果产生异常则抛出异常,否则返回Future自身 |
await | 等待任务结束,如果任务被中断则抛出中断异常,与sync不同的是只抛出中断异常,不抛出任务产生的异常 |
awaitUninterruptibly | 等待任务结束,任务不可中断 |
getNow | 任务未完成或者发生异常则返回null, 否则返回任务的结果 |
方法名 | 说明 |
---|---|
setSuccess | 通过设置结果的方式标记Future成功并通知所有listener, 如果已被标记过,则抛出异常 |
trySuccess | 通过设置结果的方式标记Future成功并通知所有listener, 如果已被标记过,只是返回false |
setFailure | 通过设置异常的方式标记Future失败并通知所有listener, 如果已被标记过,则抛出异常 |
tryFailure | 通过设置异常的方式标记Future失败并通知所有listener, 如果已被标记过,只是返回false |
DefaultPromise的设计比较有特点,利用一个result属性表示了所有的状态:
值 | 说明 |
---|---|
null | 任务还未开始执行(初始值),此时任务可以被取消 |
UNCANCELLABLE | 任务不可取消 |
CANCELLATION_CAUSE_HOLDER | 任务取消 |
CauseHolder | 执行完成,产生的异常 |
SUCCESS | 执行成功,且结果为null |
其他 | 执行成功,且结果为result |
- public Promise<V> await() throws InterruptedException {
- // 如果任务已经完成则直接返回
- if (isDone()) {
- return this;
- }
- if (Thread.interrupted()) {
- throw new InterruptedException(toString());
- }
- synchronized (this) {
- // 未完成则一直循环
- while (!isDone()) {
- // 检测是否产生死锁
- checkDeadLock();
- // 将waiter数加1
- incWaiters();
- try {
- // 等待(被唤醒)
- wait();
- } finally {
- // 将waiter数减1
- decWaiters();
- }
- }
- }
- return this;
- }
- protected void checkDeadLock() {
- EventExecutor e = executor();
- if (e != null && e.inEventLoop()) {
- throw new BlockingOperationException(toString());
- }
- }
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- System.out.println(msg);
- //这里的cf是一个DefaultPromise的子类的实例
- // 代码1
- ChannelFuture cf = ctx.write(msg);
- // 代码2
- ChannelFuture cf = ctx.writeAndFlush(msg);
- try {// 使用代码1会产生死锁(被检测到并抛出异常),而代码2是正常的
- cf.await();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
代码始终是枯燥的,下面我们举个比较易懂的例子说明下:你和朋友A都是程序员,都很老实,午餐时间到了你们去食堂排队吃东西,并且老老实实的排在一个队伍中。突然你发现忘了带卡,于是你决定等A付完钱再拿他的卡付钱。 此时如果A在你的前面他可以买完把卡给你,但是如果A在你的后面,就会出现一个局面, A在队伍后面等着买单,而你在前面等A买单后给你卡(还不愿意放弃现在的位置),最终整个队伍就卡死在你这了。
解决的方法有两个:
1、你转身让A先把卡给你;
2、把A放在其他队伍中,两个队伍互不干扰,这样A能够拿到饭,然后给你卡。
方法1在java里不可行,因为已经调用了wait(),除非有人notify或者interrupte,不然无法进行任何操作);方法2在java的fixed线程池模式下是可行的,在netty的线程池模型下是不行的,为啥? 再看看上面那个大图,一个任务进入一个队列后就和一个线程绑定死了,无法切换到其他线程,so,在netty中这样的代码千万不能写。
- “这样的代码”指在netty线程池中调用await等阻塞方法。
- 即使是代码2也最好不要写,因为不是每个这种方法你都能hold住,都知道其实现,所以最保险的方法就是不用它!
- public Promise<V> setSuccess(V result) {
- // 设置完后调用listener
- if (setSuccess0(result)) {
- notifyListeners();
- return this;
- }
- throw new IllegalStateException("complete already: " + this);
- }
- private boolean setSuccess0(V result) {
- if (isDone()) {
- return false;
- }
- synchronized (this) {
- // Allow only once.
- if (isDone()) {
- return false;
- }
- if (result == null) {
- this.result = SUCCESS;
- } else {
- this.result = result;
- }
- // 如果有等待者,则发起通知
- if (hasWaiters()) {
- notifyAll();
- }
- }
- return true;
- }
- private boolean hasWaiters() {
- return waiters > 0;
- }
任务执行完成后会
notifyListeners,这里需要注意的是,listener的方法默认情况下是使用io线程执行的,因此不要在里面有很耗时或阻塞的代码,如果确实有的话,可以在实例化Promise的时候传入非io线程的EventExecutor,或者保证listener的operationComplete方法中在其他线程池中执行。
Promise的实现还有很多,但多半是线程池实现类的内部类,这里不过多介绍了。有一个比较常见的ProgressivePromise可以关注下,在Promise的基础上增加了进度的跟踪, 适用如监控数据发送进度之类的场景。