netty长连接示例-------聊天(1)

Netty是一个Java的NIO客户端服务端框架可以快速的开发网络应用程序,比如客户端和服务端的协议,大大简化了网络程序的开发过程。下面就写一个有关聊天的功能

1:有关客户端的实现


/**
 * SimpleChatClientHandler.java
 * 客户端的handler
 * @author
 * 2018-12-12 下午1:24:33
 *
 */

public class SimpleChatClientHandler extends SimpleChannelInboundHandler<String> {

	
	
	/**
	 * 每当从服务器端读到客户端写入信息时,将信息转发给其他客户端的channel
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		System.out.println("msg>>>>"+msg);
	}

	
}

 

import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * SimpleChatClientInitializer.java
 *  增加多个handler到SimpleChatClientInitializer 上
 * @author 
 * 2018-12-12 下午1:31:14
 *
 */

public class SimpleChatClientInitializer extends ChannelInitializer<SocketChannel> {
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		 ChannelPipeline pipeline = ch.pipeline();
		 pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
		 pipeline.addLast("decoder",new StringDecoder());
		 pipeline.addLast("encoder", new StringEncoder());
		 pipeline.addLast("handler",new SimpleChatClientHandler());
	}
	
	
	

}
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.io.BufferedReader;
import java.io.InputStreamReader;

/**
 * Created by on 18/12/12
 * 启动服务端
 */
public class SimpleChatClient {
    private final int port;
    private final String host;

    public SimpleChatClient(String host, int port){
        this.host = host;
        this.port = port;
    }

    public void run() throws Exception{
        EventLoopGroup group = new NioEventLoopGroup();

        try{
            //是一个启动NIO服务的辅助启动类
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new SimpleChatClientInitializer());

            Channel channel = bootstrap.connect(host, port).sync().channel();

            BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
            while (true){
                channel.writeAndFlush(in.readLine()+"\r\n");
            }
        } catch (Exception e){
            e.printStackTrace();
        } finally{
            group.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws Exception {
        new SimpleChatClient("localhost",8080).run();
    }

}

2:下面是服务器端的实现


import io.netty.channel.Channel;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

/**
 * SimpleChatServerHandler.java
 *  服务器端处理IO
 * @author 
 * 2018-12-12 上午10:07:38
 *
 */

public class SimpleChatServerHandler extends SimpleChannelInboundHandler<String> {

	public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	
	/**
	 * 每当服务器端收到新的客户端连接时,客户端的channel存放ChannelGroup列表中,并通知列表中其他客户端
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception{
		//获取连接的channel
		Channel incomeing = ctx.channel();
		//通知所有的已经连接到服务器的客户端,有一个新的通道加入
		for(Channel channel:channels){
			channel.writeAndFlush("server-"+incomeing.remoteAddress()+"加入\n");
		}
		channels.add(ctx.channel());
	}
	
	/**
	 * 每当服务器端断开客户端连接时,客户端的channel从ChannelGroup中移除,并通知列表中其他客户端channel
	 */
	@Override
	public void handlerRemoved(ChannelHandlerContext ctx)  throws Exception{
		//获取连接的channel
		Channel incomming  = ctx.channel();
		for(Channel channel :channels){
			channel.writeAndFlush("server-"+incomming.remoteAddress()+"离开/n");
		}
		channels.remove(ctx.channel());
	}
	
	
	/**
	 * 每当从服务器端读到客户端的信息时,将信息转发给其他客户端的channel
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
		Channel incomming = ctx.channel();
		//将收到的信息转发给全部的客户端
		for(Channel channel :channels){
			if(channel!=null){
				channel.writeAndFlush("server"+incomming.remoteAddress()+msg+"/n");
				System.out.println("you"+msg+"/n");
			}else{
				channel.writeAndFlush("you"+msg+"/n");

			}
		}
		
	}
	
	/**
	 * 服务器端监听客户端活动
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception{
		//服务器接收到客户端上线通知
		Channel incomming = ctx.channel();
		System.out.println("chatClient:"+incomming.remoteAddress()+"在线");
	}
	
	
	 /**
     * 服务端监听到客户端不活动
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        //服务端接收到客户端掉线通知
        Channel incoming = ctx.channel();
        System.out.println("SimpleChatClient:" + incoming.remoteAddress()+"掉线");
    }

	
    /**
     * 当服务器端的io抛出异常时被调用
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) throws Exception{
    	Channel incoming = ctx.channel();
    	System.out.println("chatClient"+incoming.remoteAddress()+"异常");
    	//异常关闭连接
    	cause.printStackTrace();
    	ctx.close();
    }

}
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.DelimiterBasedFrameDecoder;
import io.netty.handler.codec.Delimiters;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * SimpleChatServerInitializer.java
 * 用来增加多个handler处理类到ChannelPipeline上,ChannelPipeline简单可以理解为看成一个handler容器
 * 包括解码编码 SimpleChatServerHandler等
 * @author 
 * 2018-12-12 上午11:20:14
 *
 */

public class SimpleChatServerInitializer extends ChannelInitializer<SocketChannel> {

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
		ChannelPipeline pipeline = ch.pipeline();
		pipeline.addLast("framer",new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
		pipeline.addLast("decoder",new StringDecoder());
		pipeline.addLast("encoder",new StringEncoder());
		pipeline.addLast("handle",new SimpleChatServerHandler());
		
		System.out.println("chatClient"+ch.remoteAddress()+"连上服务器");
		
	}

}
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * SimpleChatServer.java
 *  启动类
 * @author 
 * 2018-12-12 上午11:31:07
 *
 */

public class SimpleChatServer {

	private int port;
	
	public SimpleChatServer(int port){
		this.port = port;
	}
	
	public void run() throws Exception{
		//NioEventLoopGroup用来处理io操作的多线程事件循环器
		//bossGroup 用来处理接收进来的连接
		NioEventLoopGroup  bossGroup = new NioEventLoopGroup();
		//用来处理已经被接收的连接
		NioEventLoopGroup  workerGroup = new NioEventLoopGroup();
		
		try{
		//启动nio服务器的辅助启动类
		ServerBootstrap serverBootstrap = new ServerBootstrap();
		
		serverBootstrap.group(bossGroup, workerGroup)
		.channel(NioServerSocketChannel.class)
		.childHandler(new SimpleChatServerInitializer())
		.option(ChannelOption.SO_BACKLOG, 128)
        .childOption(ChannelOption.SO_KEEPALIVE, true);
		System.out.println("SimpleChatServer 启动了");
		
		//绑定接口,开始接收进来的连接
		ChannelFuture  future = serverBootstrap.bind(port).sync();
		//等待服务器socket关闭
		future.channel().closeFuture().sync();
		
		}finally{
			workerGroup.shutdownGracefully();
			bossGroup.shutdownGracefully();
			System.out.println("SimpleChatServer关闭了");
	    }
		
	}
	
	
	public static void main(String[] args) throws Exception {
		new SimpleChatServer(8080).run();
	}
	
}

3:有关运行结果


先启动服务器端,然后是客户端

这是服务器端运行结果:

netty长连接示例-------聊天(1)

下面是客户端的运行结果:

(其中需要注意的是:在有新的客户端的时候会发消息通知其他的通道)

netty长连接示例-------聊天(1)

netty长连接示例-------聊天(1)