一个简单的聊天室案例(基于netty+websocket)
1.------------------------webSocketServer 主要是用于启动netty服务------------------
/** * 启动WebSocketServer */ @Component public final class WebSocketServer { static final boolean SSL = System.getProperty("ssl") != null; public void start(Integer port) throws Exception { // Configure SSL. final SslContext sslCtx; if (SSL) { SelfSignedCertificate ssc = new SelfSignedCertificate(); sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build(); } else { sslCtx = null; } EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new WebSocketServerInitializer(sslCtx)); Channel ch = b.bind(port).sync().channel(); System.out.println("Open your web browser and navigate to " + (SSL? "https" : "http") + "://127.0.0.1:" + port + '/'); ch.closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
2.---------------------------处理http请求 用于处理http请求升级为ws
public class WebSocketIndexPageHandler extends SimpleChannelInboundHandler<FullHttpRequest> { private final String websocketPath; public WebSocketIndexPageHandler(String websocketPath) { this.websocketPath = websocketPath; } @Override protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception { // Handle a bad request. if (!req.decoderResult().isSuccess()) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), BAD_REQUEST, ctx.alloc().buffer(0))); return; } // Allow only GET methods. if (!HttpMethod.GET.equals(req.method())) { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), FORBIDDEN, ctx.alloc().buffer(0))); return; } // Send the index page if ("/".equals(req.uri()) || "/index.html".equals(req.uri())) { String webSocketLocation = getWebSocketLocation(ctx.pipeline(), req, websocketPath); ByteBuf content = WebSocketServerIndexPage.getContent(webSocketLocation); FullHttpResponse res = new DefaultFullHttpResponse(req.protocolVersion(), OK, content); res.headers().set(CONTENT_TYPE, "text/html; charset=UTF-8"); HttpUtil.setContentLength(res, content.readableBytes()); sendHttpResponse(ctx, req, res); } else { sendHttpResponse(ctx, req, new DefaultFullHttpResponse(req.protocolVersion(), NOT_FOUND, ctx.alloc().buffer(0))); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, FullHttpResponse res) { // Generate an error page if response getStatus code is not OK (200). HttpResponseStatus responseStatus = res.status(); if (responseStatus.code() != 200) { ByteBufUtil.writeUtf8(res.content(), responseStatus.toString()); HttpUtil.setContentLength(res, res.content().readableBytes()); } // Send the response and close the connection if necessary. boolean keepAlive = HttpUtil.isKeepAlive(req) && responseStatus.code() == 200; HttpUtil.setKeepAlive(res, keepAlive); ChannelFuture future = ctx.writeAndFlush(res); if (!keepAlive) { future.addListener(ChannelFutureListener.CLOSE); } } private static String getWebSocketLocation(ChannelPipeline cp, HttpRequest req, String path) { String protocol = "ws"; if (cp.get(SslHandler.class) != null) { // SSL in use so use Secure WebSockets protocol = "wss"; } return protocol + "://" + req.headers().get(HttpHeaderNames.HOST) + path; } }
3.------------------------静态 页面
package com.netty.server.websocketServer; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.util.CharsetUtil; /** * 返回一个聊天页面 */ public final class WebSocketServerIndexPage { private static final String NEWLINE = "\r\n"; public static ByteBuf getContent(String webSocketLocation) { return Unpooled.copiedBuffer( "<html><head><title>Web Socket Test</title></head>" + NEWLINE + "<body>" + NEWLINE + "<script type=\"text/javascript\">" + NEWLINE + "var socket;" + NEWLINE + "if (!window.WebSocket) {" + NEWLINE + " window.WebSocket = window.MozWebSocket;" + NEWLINE + '}' + NEWLINE + "if (window.WebSocket) {" + NEWLINE + " socket = new WebSocket(\"" + webSocketLocation + "\");" + NEWLINE + " socket.onmessage = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + '\\n' + event.data" + NEWLINE + " };" + NEWLINE + " socket.onopen = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = \"Web Socket opened!\";" + NEWLINE + " };" + NEWLINE + " socket.onclose = function(event) {" + NEWLINE + " var ta = document.getElementById('responseText');" + NEWLINE + " ta.value = ta.value + \"Web Socket closed\"; " + NEWLINE + " };" + NEWLINE + "} else {" + NEWLINE + " alert(\"Your browser does not support Web Socket.\");" + NEWLINE + '}' + NEWLINE + NEWLINE + "function send(message) {" + NEWLINE + " if (!window.WebSocket) { return; }" + NEWLINE + " if (socket.readyState == WebSocket.OPEN) {" + NEWLINE + " socket.send(message);" + NEWLINE + " } else {" + NEWLINE + " alert(\"The socket is not open.\");" + NEWLINE + " }" + NEWLINE + '}' + NEWLINE + "</script>" + NEWLINE + "<form οnsubmit=\"return false;\">" + NEWLINE + "<input type=\"text\" name=\"message\" value=\"Hello, World!\" style=\"width:300px;height:100px;border-color:#6699FF\"/>" + "<input type=\"button\" value=\"Send\"" + NEWLINE + " οnclick=\"send(this.form.message.value)\" style=\"width:100px;height:100px;background:#66FF66\" />" + NEWLINE + "<h1>Output</h1>" + NEWLINE + "<textarea id=\"responseText\" style=\"width:400px;height:500px;border:20px;solid:#378888;border-color:#6699FF;background:#66FF66\"></textarea>" + NEWLINE + "</form>" + NEWLINE + "</body>" + NEWLINE + "</html>" + NEWLINE, CharsetUtil.US_ASCII); } private WebSocketServerIndexPage() { // Unused } }
4.----------------------------------具体的处理消息handler
/** * * @Description: 处理消息的handler * TextWebSocketFrame: 在netty中,是用于为websocket专门处理文本的对象,frame是消息的载体 */ public class WebSocketFrameHandler extends SimpleChannelInboundHandler<WebSocketFrame> { // 用于记录和管理所有客户端的channle private static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private static ConcurrentHashMap<Object,Object> channels=new ConcurrentHashMap<>(); private static Integer count =0; // 定义接收到消息的操作 @Override protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // ping and pong frames already handled if (frame instanceof TextWebSocketFrame) { // Send the uppercase string back. String request = ((TextWebSocketFrame) frame).text(); StringBuilder stringBuilder=new StringBuilder();; int index=(int)channels.get(ctx.channel().id()); stringBuilder.append(RandomUtil.map.get(index)+": "); stringBuilder.append(request); for(Channel channel:clients){ channel.writeAndFlush(new TextWebSocketFrame(stringBuilder.toString().toUpperCase(Locale.US))); } } else { String message = "unsupported frame type: " + frame.getClass().getName(); throw new UnsupportedOperationException(message); } } /** * 当客户端连接服务端之后(打开连接) * 获取客户端的channle,并且放到ChannelGroup中去进行管理 */ @Override public void handlerAdded(ChannelHandlerContext ctx){ try { count++; System.out.println("用户加入事件 "+ctx.channel().id()+" 第"+count+"个用户"); clients.add(ctx.channel()); int index=RandomUtil.getIndex(); channels.put(ctx.channel().id(),index); }catch (Exception e){ e.printStackTrace(); System.out.println(e.getMessage()); } } //定义用户离线的操作 @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+incoming+"掉线"); } //定义用户异常的操作 @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel incoming = ctx.channel(); System.out.println("用户:"+incoming.id()+"异常"); cause.printStackTrace(); ctx.close(); } }
5.--------------------------------初始化Initializer
public class WebSocketServerInitializer extends ChannelInitializer<SocketChannel> { private static final String WEBSOCKET_PATH = "/websocket"; private final SslContext sslCtx; public WebSocketServerInitializer(SslContext sslCtx) { this.sslCtx = sslCtx; } @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); if (sslCtx != null) { pipeline.addLast(sslCtx.newHandler(ch.alloc())); } //// websocket 基于http协议,所以要有http编解码器 pipeline.addLast(new HttpServerCodec()); //java.lang.NoSuchMethodError: // io.netty.util.internal.AppendableCharSequence.setLength(I)V //该异常是因为版本冲突导致 pipeline.addLast(new HttpObjectAggregator(1024*64)); // WebSocket数据压缩 pipeline.addLast(new WebSocketServerCompressionHandler()); // 协议包长度限制 pipeline.addLast(new WebSocketServerProtocolHandler(WEBSOCKET_PATH, null, true)); //这个仅限demo使用 是用来返回初始化html页面的 pipeline.addLast(new WebSocketIndexPageHandler(WEBSOCKET_PATH)); /* pipeline.addLast(new HttpRequestHandler("/ws"));*/ //具体的消息处理逻辑 pipeline.addLast(new WebSocketFrameHandler()); } }
6.----------------------util
package com.netty.server.websocketServer; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Random; import java.util.Set; import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; public class RandomUtil { static Map<Integer,String> map= new ConcurrentHashMap(); static Set<Integer> set=new TreeSet<>(); public static void getOne(){ Random random=new Random(); for(int i=0;i<10;i++){ set.add(random.nextInt(10)); } if(set.size()<10){ getOne(); } } static { getOne(); map.put(0,"尼古拉斯儿"); map.put(1,"张三"); map.put(2,"李四"); map.put(3,"王二麻子"); map.put(4,"马六"); map.put(5,"申公豹"); map.put(6,"大麻花"); map.put(7,"小垃圾"); map.put(8,"鸡腿"); map.put(9,"鸭腿"); } public static int getIndex(){ int re=0; if(!set.isEmpty()){ Iterator itr=set.iterator(); re=(int)itr.next(); set.remove(re); return re; } return re; } }
7.-----------------启动类
@SpringBootApplication @ComponentScan("com.netty") public class NettyServerApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(NettyServerApplication.class, args); WebSocketServer webSocketServer=context.getBean(WebSocketServer.class); try { webSocketServer.start(8888); } catch (Exception e) { e.printStackTrace(); } } }
效果图: