Netty的基础使用以及原理(NIO框架)
NIO框架
-
Netty
、Mina、Akka(scala 高可用、可扩展)
Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 Netty 是一个基于NIO的客户、服务器端编程框架,使用Netty 可以确保你快速和简单的开发出一个网络应用 .
为何使用Netty
-
API简单
,开发门槛低(无需掌握NIO) - 功能强大,预置了多种编解码器(半包,粘包问题)-支持多种主流协议
- 定制能力强-(人为定制自己编解码器)- 自定义协议
- 修复JDK NIO bug,解决原始NIO的不稳定性
- 使用人群多,社区活跃
Netty架构设计
开发依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>5.0.0.Alpha2</version>
</dependency>
服务端开发
//1.创建服务启动引导
ServerBootstrap sbt=new ServerBootstrap();
//2.创建线程池组 boss、worker
EventLoopGroup boss=new NioEventLoopGroup();
EventLoopGroup worker=new NioEventLoopGroup();
//3.设置线程池组
sbt.group(boss,worker);
//4.设置服务实现类
sbt.channel(NioServerSocketChannel.class);
//5.初始化通讯管道
sbt.childHandler(new CustomServerChannelInitializer());
//6.绑定端口,并且启动服务
System.out.println("我在9999监听...");
ChannelFuture future = sbt.bind(9999).sync();
//7.关闭通道资源
future.channel().closeFuture().sync();
//8.释放资源
boss.shutdownGracefully();
worker.shutdownGracefully();
---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class CustomServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//通讯管道
ChannelPipeline pipeline = ch.pipeline();
//在管道末端 添加最终处理者
pipeline.addLast(new CustomServerChannelHandlerAdapter());
}
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import java.util.Date;
public class CustomServerChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 异常回调
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
/**
* 接收消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg:"+msg);
ByteBuf buf= ctx.alloc().buffer();
buf.writeBytes((new Date().toLocaleString()).getBytes());
//发送消息
ChannelFuture future = ctx.writeAndFlush(buf);
//关闭SocketChannel
future.addListener(ChannelFutureListener.CLOSE);
}
}
客户端
//1.创启动引导
Bootstrap bt=new Bootstrap();
//2.创建线程池组 worker
EventLoopGroup worker=new NioEventLoopGroup();
//3.设置线程池组
bt.group(worker);
//4.设置服务实现类
bt.channel(NioSocketChannel.class);
//5.初始化通讯管道
bt.handler(new CustomClientChannelInitializer());
//6.连接端口,并且启动服务
ChannelFuture future = bt.connect("127.0.0.1",9999).sync();
//7.关闭通道资源
future.channel().closeFuture().sync();
//8.释放资源
worker.shutdownGracefully();
---
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class CustomClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
//末端添加最终处理者
pipeline.addLast(new CustomClientChannelHandlerAdapter());
}
}
---
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
public class CustomClientChannelHandlerAdapter extends ChannelHandlerAdapter {
/**
* 异常回调
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
}
/**
* 连接到服务器,发送数据
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf=ctx.alloc().buffer();
buf.writeBytes("hello,我是客户端".getBytes());
ctx.writeAndFlush(buf);
}
/**
* 接收消息
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf= (ByteBuf) msg;
System.out.println("客户端收到:"+buf.toString(CharsetUtil.UTF_8));
}
}
ByteBuf
运行原理
ByteBuf buf= Unpooled.buffer(3);
//new PooledByteBufAllocator().buffer();
//new UnpooledByteBufAllocator(true).buffer();
System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
buf.writeBytes("abc".getBytes());
System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
byte[] values=new byte[1];
buf.readBytes(values);
System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
buf.discardReadBytes();
System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
buf.clear();
System.out.println(buf.readerIndex()+" "+buf.writerIndex()+" "+buf.readableBytes()+" "+buf.capacity());
Netty传输对象
Netty如何捕获序列化异常?
//捕获序列化异常
future.addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE);
//异常时自动关闭连接
future.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
public class UserMessageToMessageDecoder extends MessageToMessageDecoder<ByteBuf> {
/**
*
* @param ctx
* @param msg :解码一帧数据
* @param out :解码帧数据
* @throws Exception
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
byte[] values=new byte[msg.readableBytes()];
msg.readBytes(values);
ByteArrayInputStream bais=new ByteArrayInputStream(values);
ObjectInputStream ois=new ObjectInputStream(bais);
Object o = ois.readObject();
ois.close();
out.add(o);
}
}
---
public class UserMessageToMessageEncoder extends MessageToMessageEncoder<Object> {
/**
*
* @param ctx
* @param msg :需要编码的对象 一帧数据
* @param out :
* @throws Exception
*/
@Override
protected void encode(ChannelHandlerContext ctx, Object msg, List<Object> out) throws Exception {
ByteBuf buf= Unpooled.buffer();
ByteArrayOutputStream baos=new ByteArrayOutputStream();
ObjectOutputStream oos=new ObjectOutputStream(baos);
oos.writeObject(msg);
oos.flush();
oos.close();
buf.writeBytes(baos.toByteArray());
out.add(buf);
}
}
上一篇:RPC 设计与实现
下一篇:Zookeeper的基本知识与使用