使用Netty进行WebSocket协议开发
一、简介
WebSocket协议是HTML5新增的协议,解决了HTTP请求只能通过浏览器发起,服务端被动接收的问题,HTTP协议是半双工协议,数据可以在客户端和服务端两个方向上传输,但是不能同时传输,而WebSocket是全双工协议,一旦建立连接就可以两个方向同时传输数据。WebSocket连接的建立也是通过HTTP请求发起TCP握手连接,它在客户端通过js发起,在消息头部增加Upgrade: websocket字段,表示请求建立WebSocket连接,通过ping/pong帧保持链路**,服务端可以主动传递消息给客户端,不再需要客户端轮询。WebSocket无头部信息、Cookie和身份验证,相对于HTTP冗长的请求头能更好的节约服务器资源和带宽,是取代轮询实现实时通信的理想方式。在使用WebSocket之前需要检查浏览器是否支持该协议。
二、WebSocket连接建立流程
1)客户端向服务端发起一个HTTP请求,这个请求和一般的HTTP请求相比,增加了一些头信息,其中的附加头信息” Upgrade: websocket”表示这是一个申请协议升级的HTTP请求。
2)服务端接收到请求后生成应该消息返回给客户端,客户端和服务端的WebSocket连接就建立起来了,双方可以通过这个通道自由的发送数据。
3)连接会持续存在,直到客户端或服务端的某一方主动关闭连接。
请求头如下所示:
请求消息中的Sec-WebSocket-Key是随机的,服务端会根据这些数据来构造一个SHA-1的信息摘要,把”SHA-1”加上一个魔幻字符串”258EAFA5-E914-47DA-95CA-C5AB0DC85B11”。使用SHA-1加密,然后进行BASE-64编码,将结果作为” sec-websocket-accept”头的值,返回给客户端。
响应头如下:
三、WebSocket生命周期
1)通信
连接建立之后,双方开始通信,一个消息由一个或多个帧组成,WebSocket的消息并不一定对应一个特定网络层的帧,它可以被分割成多个帧或者被合并。
帧都有自己对应的类型,属于同一个消息的多个帧具有相同的类型的数据。消息的数据类型包括文本数据、二进制数据和控制帧(协议级信令,如信号)。
2)关闭连接
为了关闭WebSocket连接,客户端和服务端需要一个安全的方法关闭底层TCP连接以及TLS会话,如果合适,有可能丢弃已经接收的字节,必要时可以通过任何可用的手段关闭连接。
底层TCP连接,正常情况下,应该由服务端关闭。异常情况下,比如一个合理的时间周期后没有接收到服务器的TCPclose,客户端可以发起TCPclose。因此当服务器被指示关闭WebSocket连接时,它应该立刻发起一个TCPclose操作;客户端应该等待服务器的TCPclose。
WebSocket的握手关闭消息带有一个状态码和一个可选的关闭原因,它必须按照协议要求发送一个Close控制帧,当对端接收到关闭控制帧指令时,需要主动关闭WebSocket连接。如下是通过刷新客户端浏览器来模拟客户端发起的关闭WebSocket连接时,服务端接收到的messages内容:
服务端根据消息类型是CloseWebSocketFrame类型来执行关闭连接的动作。
四、使用Netty实现WebSocket协议
1)Netty服务端实现
首先是服务端启动类:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;
public class WebSocketServer
{
public void run(int port)throws Exception{
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try
{
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
throws Exception
{
ChannelPipeline pipeline=ch.pipeline();
//将请求和应答消息编码或解码为HTTP消息
pipeline.addLast("http-codec",new HttpServerCodec());
//将HTTP消息的多个部分组合成一条完整的HTTP消息
pipeline.addLast("aggregator",new HttpObjectAggregator(65536));
//向客户端发送HTML5文件,主要用于支持浏览器和服务端进行WebSocket通信
pipeline.addLast("http-chunked",new ChunkedWriteHandler());
pipeline.addLast("handler",new WebSocketServerHandler());
}
});
Channel f=b.bind(port).sync().channel();
System.out.println("Web socket server started at port "+port+".");
System.out.println("Open your browser and navigate to http://localhost:"+port+"/");
f.closeFuture().sync();
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception
{
int port =8888;
try
{
if (args!=null&&args.length>0)
{
port=Integer.valueOf(args[0]);
}
}
catch (Exception e)
{
e.printStackTrace();
}
new WebSocketServer().run(port);
}
}
WebSocket服务端的启动类和HTTP协议的十分相似,主要的处理逻辑在ChannelPipeline中增加的WebSocketServerHandler类。
下面是WebSocketServerHandler类的实现:
import static io.netty.handler.codec.http.HttpHeaderUtil.*;
import java.util.Date;
import java.util.logging.Logger;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;
public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object>
{
private static final Logger logger=Logger.getLogger(WebSocketServerHandler.class.getName());
private WebSocketServerHandshaker handshaker;
@Override
protected void messageReceived(ChannelHandlerContext ctx, Object msg)
throws Exception
{
//判断请求是HTTP请求还是WebSocket请求
if (msg instanceof FullHttpRequest)
{
//处理WebSocket握手请求
handleHttpRequest(ctx, (FullHttpRequest)msg);
}else if (msg instanceof WebSocketFrame) {
//处理WebSocket请求
handleWebSocketFrame(ctx, (WebSocketFrame)msg);
}
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
ctx.flush();
}
private void handleHttpRequest(ChannelHandlerContext ctx,FullHttpRequest req)throws Exception{
//先判断解码是否成功,然后判断是不是请求建立WebSocket连接
//如果HTTP解码失败,返回HTTP异常
if(!req.decoderResult().isSuccess()
||(!"websocket".equals(req.headers().get("Upgrade")))){
sendHttpResponse(ctx,req,new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
}
//构造握手工厂创建握手处理类 WebSocketServerHandshaker,来构造握手响应返回给客户端
WebSocketServerHandshakerFactory wsFactory=new WebSocketServerHandshakerFactory("ws://localhost:8888/websocket", null, false);
handshaker=wsFactory.newHandshaker(req);
if(handshaker==null){
WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
}else {
handshaker.handshake(ctx.channel(), req);
}
}
//如果接收到的消息是已经解码的WebSocketFrame消息
public void handleWebSocketFrame(ChannelHandlerContext ctx,WebSocketFrame frame)throws Exception{
//先对控制帧进行判断
//判断是否是关闭链路的指令
if (frame instanceof CloseWebSocketFrame)
{
handshaker.close(ctx.channel(), (CloseWebSocketFrame)frame.retain());
return;
}
//判断是否是维持链路的Ping消息
if (frame instanceof PingWebSocketFrame)
{
ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
return;
}
//本例程仅支持文本消息,不支持二进制消息
if (!(frame instanceof TextWebSocketFrame))
{
throw new UnsupportedOperationException(String.format("%s frame type not supported", frame.getClass().getName()));
}
//返回应答消息
String request=((TextWebSocketFrame)frame).text();
if(logger.isLoggable(java.util.logging.Level.FINE)){
logger.fine(String.format("%s received %s", ctx.channel(),request));
}
ctx.channel().write(new TextWebSocketFrame(request+" , 欢迎使用Netty WebSocket服务,现在时刻:"+new Date().toString()));
}
private void sendHttpResponse(ChannelHandlerContext ctx,FullHttpRequest req,FullHttpResponse resp){
if(resp.status().code()!=200){
ByteBuf buf=Unpooled.copiedBuffer(resp.status().toString(),CharsetUtil.UTF_8);
resp.content().writeBytes(buf);
buf.release();
setContentLength(resp,resp.content().readableBytes());
}
ChannelFuture f=ctx.channel().writeAndFlush(resp);
if(!isKeepAlive(resp)||resp.status().code()!=200){
f.addListener(ChannelFutureListener.CLOSE);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause)throws Exception{
cause.printStackTrace();
ctx.close();
}
}
请求经过ChannelInitializer.initChannel方法的处理后交给WebSocketServerHandler.messageReceived方法处理。
首先是判断请求是HTTP请求还是WebSocket请求,如果是HTTP连接就执行handleHttpRequest()方法,判断解码是否成功并判断是不是请求建立WebSocket连接,如果判断成功就构造握手工厂创建握手处理类 WebSocketServerHandshaker,来构造握手响应返回给客户端,这样客户端和服务端就建立起了WebSocket连接。
如果接收到的请求是WebSocket请求,就执行handleWebSocketFrame()方法,该方法会先对控制帧进行判断,判断是否是关闭链路的指令,如果是就通过WebSocketServerHandshaker.close()方法执行关闭WebSocket连接的操作,如果是维持链路的Ping消息,就返回客户端PONG消息,并且判断请求消息是不是二进制消息,这里限制只接收文本消息,最后处理接收到的消息,并返回响应。
五、客户端页面
客户端页面需要通过js发起建立WebSocket的连接,然后进行通信,页面代码如下:
<!DOCTYPE html>
<html>
<head>
<title>Netty WebSocket时间服务器</title>
<meta name="content-type" content="text/html; charset=UTF-8">
</head>
<br>
<body>
<br>
<script type="text/javascript">
var socket;
if(!window.WebSocket){
window.WebSocket=window.MozWebSocket;
}
if(window.WebSocket){
socket=new WebSocket("ws://localhost:8888/webSocket");
socket.onmessage=function(event){
var ta=document.getElementById('responseText');
ta.value="";
ta.value=event.data;
};
socket.onopen=function(event){
var ta=document.getElementById('responseText');
ta.value='打开WebSocket服务器正常,浏览器支持WebSocket!';
};
socket.onclose=function(event){
var ta=document.getElementById('responseText');
ta.value='';
ta.value="WebSocket 关闭!";
};
}else{
alert("抱歉,您的浏览器不支持WebSocket协议!");
}
function send(message){
if(!window.WebSocket){
return;
}
if(socket!=null){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功,请刷新页面!");
}
/* if(socket.readyState==WebSocket.open){
socket.send(message);
}else{
alert("WebSocket连接没有建立成功!");
} */
}
</script>
<form onsubmit="return false;">
<input type="text" name="message" value="Netty最佳实践"/>
<br><br>
<input type="button" value="发送WebSocket请求消息" onclick="send(this.form.message.value)"/>
<hr color="blue"/>
<h3>服务端返回的应答消息</h3>
<textarea id="responseText" style="width:500px;height:300px;"></textarea>
</form>
</body>
</html>
通过浏览器打开编写的HTML页面,然后如果浏览器WebSocket建立成功就会显示打开”WebSocket服务器正常,浏览器支持WebSocket!”,如果浏览器不支持WebSocket就会提示"抱歉,您的浏览器不支持WebSocket协议!",WebSocket连接建立之后,通过点击"发送WebSocket请求消息"按钮发起请求获取服务端的响应。
参考书籍《Netty权威指南》