Netty的基础使用以及原理(NIO框架)

NIO框架

  • Netty、Mina、Akka(scala 高可用、可扩展)

Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用 .

为何使用Netty

  • API简单,开发门槛低(无需掌握NIO)
  • 功能强大,预置了多种编解码器(半包,粘包问题)-支持多种主流协议
  • 定制能力强-(人为定制自己编解码器)- 自定义协议
  • 修复JDK NIO bug,解决原始NIO的不稳定性
  • 使用人群多,社区活跃

Netty架构设计Netty的基础使用以及原理(NIO框架)

开发依赖

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>5.0.0.Alpha2</version>
</dependency>

服务端开发

//1.创建服务启动引导
ServerBootstrap sbt=new ServerBootstrap();
//2.创建线程池组 boss、worker
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
//3.设置线程池组
sbt.group(boss,worker);
//4.设置服务实现类
sbt.channel(NioServerSocketChannel.class);
//5.初始化通讯管道
sbt.childHandler(new CustomServerChannelInitializer());
//6.绑定端口,并且启动服务
System.out.println("我在9999监听...");
ChannelFuture future = sbt.bind(9999).sync();
//7.关闭通道资源
future.channel().closeFuture().sync();
//8.释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();

---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class CustomServerChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        //通讯管道
        ChannelPipeline pipeline = ch.pipeline();

        //在管道末端 添加最终处理者
        pipeline.addLast(new CustomServerChannelHandlerAdapter());
    }
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

import java.util.Date;

public class CustomServerChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * 异常回调
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * 接收消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg:"+msg);
        ByteBuf buf= ctx.alloc().buffer();
        buf.writeBytes((new Date().toLocaleString()).getBytes());
        //发送消息
        ChannelFuture future = ctx.writeAndFlush(buf);
        //关闭SocketChannel
        future.addListener(ChannelFutureListener.CLOSE);
    }
}

客户端

//1.创启动引导
Bootstrap bt=new Bootstrap();
//2.创建线程池组 worker
EventLoopGroup worker=new NioEventLoopGroup();
//3.设置线程池组
bt.group(worker);
//4.设置服务实现类
bt.channel(NioSocketChannel.class);
//5.初始化通讯管道
bt.handler(new CustomClientChannelInitializer());
//6.连接端口,并且启动服务
ChannelFuture future = bt.connect("127.0.0.1",9999).sync();
//7.关闭通道资源
future.channel().closeFuture().sync();
//8.释放资源
worker.shutdownGracefully();
---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;

public class CustomClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();

        //末端添加最终处理者
        pipeline.addLast(new CustomClientChannelHandlerAdapter());
    }
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;

public class CustomClientChannelHandlerAdapter extends ChannelHandlerAdapter {
    /**
     * 异常回调
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
    }

    /**
     * 连接到服务器,发送数据
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ByteBuf buf=ctx.alloc().buffer();
        buf.writeBytes("hello,我是客户端".getBytes());

        ctx.writeAndFlush(buf);
    }
    /**
     * 接收消息
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       ByteBuf buf= (ByteBuf) msg;
       System.out.println("客户端收到:"+buf.toString(CharsetUtil.UTF_8));
    }
}

ByteBuf

运行原理
Netty的基础使用以及原理(NIO框架)

ByteBuf buf=  Unpooled.buffer(3);
                          //new PooledByteBufAllocator().buffer();
                         //new UnpooledByteBufAllocator(true).buffer();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
        buf.writeBytes("abc".getBytes());
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

        byte[] values=new byte[1];
        buf.readBytes(values);
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
        buf.discardReadBytes();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

        buf.clear();
        System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());

Netty传输对象

Netty如何捕获序列化异常?

 //捕获序列化异常
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//异常时自动关闭连接
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public class UserMessageToMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
    /**
     *
     * @param ctx
     * @param msg :解码一帧数据
     * @param out :解码帧数据
     * @throws Exception
     */
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {

        byte[] values=new byte[msg.readableBytes()];
        msg.readBytes(values);

        ByteArrayInputStream bais=new ByteArrayInputStream(values);
        ObjectInputStream ois=new ObjectInputStream(bais);
        Object o = ois.readObject();
        ois.close();

        out.add(o);

    }
}

---
public class UserMessageToMessageEncoder extends MessageToMessageEncoder<Object> {
    /**
     *
     * @param ctx
     * @param msg  :需要编码的对象 一帧数据
     * @param out  :
     * @throws Exception
     */
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
        ByteBuf buf= Unpooled.buffer();

        ByteArrayOutputStream baos=new ByteArrayOutputStream();
        ObjectOutputStream oos=new ObjectOutputStream(baos);
        oos.writeObject(msg);
        oos.flush();
        oos.close();

        buf.writeBytes(baos.toByteArray());

        out.add(buf);

    }
}    

上一篇:RPC 设计与实现
下一篇:Zookeeper的基本知识与使用