netty长连接示例-------聊天(1)
Netty是一个Java的NIO客户端服务端框架可以快速的开发网络应用程序,比如客户端和服务端的协议,大大简化了网络程序的开发过程。下面就写一个有关聊天的功能
1:有关客户端的实现
/**
* SimpleChatClientHandler.java
* 客户端的handler
* @author
* 2018-12-12 下午1:24:33
*
*/
public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {
/**
* 每当从服务器端读到客户端写入信息时,将信息转发给其他客户端的channel
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
System.out.println("msg>>>>"+msg);
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* SimpleChatClientInitializer.java
* 增加多个handler到SimpleChatClientInitializer 上
* @author
* 2018-12-12 下午1:31:14
*
*/
public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler",new SimpleChatClientHandler());
}
}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.io.BufferedReader;
import java.io.InputStreamReader;
/**
* Created by on 18/12/12
* 启动服务端
*/
public class SimpleChatClient {
private final int port;
private final String host;
public SimpleChatClient(String host, int port){
this.host = host;
this.port = port;
}
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
try{
//是一个启动NIO服务的辅助启动类
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while (true){
channel.writeAndFlush(in.readLine()+"\r\n");
}
} catch (Exception e){
e.printStackTrace();
} finally{
group.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
new SimpleChatClient("localhost",8080).run();
}
}
2:下面是服务器端的实现
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;
/**
* SimpleChatServerHandler.java
* 服务器端处理IO
* @author
* 2018-12-12 上午10:07:38
*
*/
public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> {
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
/**
* 每当服务器端收到新的客户端连接时,客户端的channel存放ChannelGroup列表中,并通知列表中其他客户端
*/
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
//获取连接的channel
Channel incomeing = ctx.channel();
//通知所有的已经连接到服务器的客户端,有一个新的通道加入
for(Channel channel:channels){
channel.writeAndFlush("server-"+incomeing.remoteAddress()+"加入\n");
}
channels.add(ctx.channel());
}
/**
* 每当服务器端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel
*/
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception{
//获取连接的channel
Channel incomming = ctx.channel();
for(Channel channel :channels){
channel.writeAndFlush("server-"+incomming.remoteAddress()+"离开/n");
}
channels.remove(ctx.channel());
}
/**
* 每当从服务器端读到客户端的信息时,将信息转发给其他客户端的channel
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
Channel incomming = ctx.channel();
//将收到的信息转发给全部的客户端
for(Channel channel :channels){
if(channel!=null){
channel.writeAndFlush("server"+incomming.remoteAddress()+msg+"/n");
System.out.println("you"+msg+"/n");
}else{
channel.writeAndFlush("you"+msg+"/n");
}
}
}
/**
* 服务器端监听客户端活动
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception{
//服务器接收到客户端上线通知
Channel incomming = ctx.channel();
System.out.println("chatClient:"+incomming.remoteAddress()+"在线");
}
/**
* 服务端监听到客户端不活动
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//服务端接收到客户端掉线通知
Channel incoming = ctx.channel();
System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"掉线");
}
/**
* 当服务器端的io抛出异常时被调用
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
Channel incoming = ctx.channel();
System.out.println("chatClient"+incoming.remoteAddress()+"异常");
//异常关闭连接
cause.printStackTrace();
ctx.close();
}
}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* SimpleChatServerInitializer.java
* 用来增加多个handler处理类到ChannelPipeline上,ChannelPipeline简单可以理解为看成一个handler容器
* 包括解码编码 SimpleChatServerHandler等
* @author
* 2018-12-12 上午11:20:14
*
*/
public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast("decoder",new StringDecoder());
pipeline.addLast("encoder",new StringEncoder());
pipeline.addLast("handle",new SimpleChatServerHandler());
System.out.println("chatClient"+ch.remoteAddress()+"连上服务器");
}
}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* SimpleChatServer.java
* 启动类
* @author
* 2018-12-12 上午11:31:07
*
*/
public class SimpleChatServer {
private int port;
public SimpleChatServer(int port){
this.port = port;
}
public void run() throws Exception{
//NioEventLoopGroup用来处理io操作的多线程事件循环器
//bossGroup 用来处理接收进来的连接
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
//用来处理已经被接收的连接
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
//启动nio服务器的辅助启动类
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new SimpleChatServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
System.out.println("SimpleChatServer 启动了");
//绑定接口,开始接收进来的连接
ChannelFuture future = serverBootstrap.bind(port).sync();
//等待服务器socket关闭
future.channel().closeFuture().sync();
}finally{
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println("SimpleChatServer关闭了");
}
}
public static void main(String[] args) throws Exception {
new SimpleChatServer(8080).run();
}
}
3:有关运行结果
先启动服务器端,然后是客户端
这是服务器端运行结果:
下面是客户端的运行结果:
(其中需要注意的是:在有新的客户端的时候会发消息通知其他的通道)