netty中的Bytebuf管理

前言

netty中的Bytebuf类似于java nio中的ByteBuffer。不过,与原生的ByteBuffer相比,Bytebuf采用了读写两个指针,使用起来更加方便,不至于每次在读写切换过程中都需要flip()操作。而且Bytebuf支持动态扩展。
netty为了实现高效操作,对Bytebuf设置了两种实现,一种是在堆上实现的HeapBytebuf,另一种是在堆外实现的DirecetBytebuf。DirecetBytebuf避免了Bytebuf在用户区与内核区的数据复制过程,效率更高。但是在堆外操作内存(创建与销毁工作)效率是比较低的,为了提升效率,netty又提供了内存池,包括DirecetBytebufPool和HeapBytebufPool。
Bytebuf的内存管理是由引用计数器管理的。在进行Bytebuf的相关操作时,尤其要注意其创建与释放过程,避免内存泄漏。

1、基于PoolDirecetBytebuf的内存泄漏问题

1.1 问题复现

我们在netty服务端,简单设置一个EchoServerHandler,用于查看PoolDirecetBytebuf的内存泄漏问题。EchoServerHandler基于ChannelInboundHandlerAdapter实现,其channelRead方法如下:

@Override
	public void channelRead(ChannelHandlerContext ctx,Object msg){	
		
		ByteBuf in=(ByteBuf) msg;
		byte[] body = new byte[in.readableBytes()];
		in.readBytes(body);

	}

几乎什么都没有操作,在channelRead中,ByteBuf in默认采用池化的directBytebuf实现。为了监测directBytebuf是否在不断增长,参考https://www.jianshu.com/p/4e96beb37935的做法,自定义一个类,用于查看io.netty.util.internal.PlatformDependent类中DIRECT_MEMORY_COUNTER这个静态变量的值(此值用于统计已经分配的直接内存大小)。

public class DirectMemoryLeakReporterImpl {

	private static final int _1k=1024;
	private static final String BUSINESS_KEY="netty_direct_leak";
	private AtomicLong directMemoryCount;
	
	public void init(){
		
		java.lang.reflect.Field field=ReflectionUtils.findField(PlatformDependent.class, "DIRECT_MEMORY_COUNTER");
		field.setAccessible(true);
		
		try {
			directMemoryCount=((AtomicLong) field.get(PlatformDependent.class));
			
		} catch (Exception e) {
			// TODO: handle exception
		}
		
	}
	
	
	public void startReport(){
		
		ScheduledExecutorService service=Executors.newScheduledThreadPool(1);
		service.scheduleAtFixedRate(runnable, 0, 1, TimeUnit.SECONDS);
		
	}
	
	
	Runnable runnable=new Runnable() {
		
		@Override
		public void run() {
			// TODO Auto-generated method stub
			try {
				
				int memoryInkb=(int)directMemoryCount.get()/_1k;
				System.out.println(BUSINESS_KEY+"--->"+memoryInkb);
				
			} catch (Exception e) {
				// TODO: handle exception
				System.out.println(BUSINESS_KEY+"--->"+e.toString());
			}
		}
	};
	
	
}

在启动netty服务之前,先启动此类。

在客户端,我们定义了一个类,不断向服务端发送数据,为了快速验证,每次发送的数据包都比较大

	@Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {
   	// TODO Auto-generated method stub
   	
    for (int i = 0; i < 1000000; i++) {
   	 StringBuilder builder=new StringBuilder();
   	 for (int j = 0; j < 1000; j++) {
   		 builder.append("netty rocks,netty rocks!");
   	}
   	 ctx.writeAndFlush(Unpooled.copiedBuffer(builder.toString(),CharsetUtil.UTF_8));
   	 System.out.println("发送数据"+count++);
   	 TimeUnit.MILLISECONDS.sleep(500);
    }
   }

然后运行,看执行结果,客户端发送了大概1500次之后

netty_direct_leak--->16384
netty_direct_leak--->98304

堆外直接内存分配大小从16384K增长到了98304K,即从16M增加到90多M。说明堆外内存已经发生了泄漏。

1.2 解决方案

由于是ByteBuf in进行了内存的申请,但没有释放,因此可以通过ReferenceCountUtil.release(in)来手动释放

	@Override
   public void channelRead(ChannelHandlerContext ctx,Object msg){	
   	
   	ByteBuf in=(ByteBuf) msg;
   	byte[] body = new byte[in.readableBytes()];
   	in.readBytes(body);
   	ReferenceCountUtil.release(in);
   	
   	}

又或者让我们的EchoServerHandler继承SimpleChannelInboundHandler实现,然后我们自定义实现其中的channelRead0方法,通过查看源码可知,SimpleChannelInboundHandler类中的channelRead每次在结束时,都会手动释放: ReferenceCountUtil.release(msg);
又或者,让我们的EchoServerHandler每次处理完,都firechannelRead,让我们netty内部框架实现的TailContext来最终完成传入的消息的手动释放内存。

2 基于PoolHeapBytebuf的内存泄漏问题

由于堆外内存是不归虚拟机管理,因此需要我们手动操作释放内存,那么基于heap实现的Bytebuf,是否需要我们手动释放内存呢。
对上述的服务端的代码进行稍微改动:

public class EchoServerHandler extends ChannelInboundHandlerAdapter {
	//定义用于生成heapBytebuf的内存池实现
	 PooledByteBufAllocator allocator=new PooledByteBufAllocator(false);
	 

	@Override
	public void channelRead(ChannelHandlerContext ctx,Object msg){
	
		ByteBuf in=(ByteBuf) msg;
		byte[] body = new byte[in.readableBytes()];
		in.readBytes(body);
		ReferenceCountUtil.release(in);
	
		//创建heapbytebuf
		ByteBuf heapBuffer = allocator.heapBuffer(100000);
		heapBuffer.writeByte(10);		
	}

每次收到数据后,都手动创建一个大容量块的heapBuffer ,其由基于heapBytebuf的内存池实现。
客户端方法仍旧一致,让其运行,通过JvisumalVm查看,

netty中的Bytebuf管理
netty中的Bytebuf管理说明如果我们手动创建了基于heap的Bytebuf,那么也是要手动进行释放的,否则也会发生内存泄漏。

3 基于响应的Bytebuf

如果最终的Bytebuf进行了Write或者WriteFlush操作,那么Netty系统会自动的 为这个Bytebuf进行内存释放的工作。

4 Bytebuf的内存管理注意事项

上面说了Bytebuf是基于引用计数器进行管理的,即通过判断refCnt的值来判断这个Bytebuf是否可以继续使用。有时候如果refCnt已经为0了,而我们想要继续操作这个Bytebuf,那么系统就会报错。通过ReferenceCountUtil这个类,可以完成Bytebuf的refCnt的值的增加与减少。

ReferenceCountUtil.release(Bytebuf);

将Bytebuf的引用计数-1,如果引用计数减少后未0,就返回true,否则返回false。所以,有时候我们确实想要释放一个Bytebuf,但是又不知道其引用计数是多少,可以通过一个while循环,不断的调用ReferenceCountUtil.release,直到其返回true为止。

ReferenceCountUtil.retain(Bytebuf);

通过这个方法,可以增加 Bytebuf的引用计数,避免有些情况,引用计数为0,而导致不能使用Bytebuf。