Netty实践(二):TCP拆包、粘包问题
什么是TCP拆包、粘包?
在网络通信中,数据在底层都是以字节流形式在流动,那么发送方和接受方理应有一个约定(协议),只有这样接受方才知道需要接受多少数据,哪些数据需要在一起处理;如果没有这个约定,就会出现本应该一起处理的数据,被TCP划分为多个包发给接收方进行处理,如下图:
看一个TCP拆包、粘包的实例
客户端Handler:
服务端Handler:
运行结果:
上面的程序本意是CLIENT发送3次消息给SERVER,SERVER端理应处理3次,可是结果SERVER却将3条消息一次处理了。
那么如何解决TCP拆包、粘包问题呢?其实思路不外乎有3种:
第一种:发定长数据
接收方拿固定长度的数据,发送方发送固定长度的数据即可。但是这样的缺点也是显而易见的:如果发送方的数据长度不足,需要补位,浪费空间。
第二种:在包尾部增加特殊字符进行分割
发送方发送数据时,增加特殊字符;在接收方以特殊字符为准进行分割
第三种:自定义协议
类似于HTTP协议中的HEAD信息,比如我们也可以在HEAD中,告诉接收方数据的元信息(数据类型、数据长度等)
Netty如何解决TCP拆包、粘包问题?
在《Java通信实战:编写自定义通信协议实现FTP服务》中,涉及到了JAVA SOCKET这方面的处理,大家可以参考。接下来,我们来看Netty这个框架是如何帮助我们解决这个问题的。本篇博客的代码在《Netty实践(一):轻松入门》基础上进行。
方式一:定长消息
Server启动类:
Client Handler:
运行结果:
利用FixedLengthFrameDecoder,加入到管道流处理中,长度够了接收方才能收到。
方式二:自定义分隔符
Server启动类:
Client Handler:
运行结果:
方式三:自定义协议
下面我们将简单实现一个自定义协议:
HEAD信息中包含:数据长度、数据版本
数据内容
MyHead
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class MyHead {
//数据长度
private int length;
//数据版本
private int version;
public MyHead( int length, int version) {
this .length = length;
this .version = version;
}
public int getLength() {
return length;
}
public void setLength( int length) {
this .length = length;
}
public int getVersion() {
return version;
}
public void setVersion( int version) {
this .version = version;
}
} |
MyMessage
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
|
public class MyMessage {
//消息head
private MyHead head;
//消息body
private String content;
public MyMessage(MyHead head, String content) {
this .head = head;
this .content = content;
}
public MyHead getHead() {
return head;
}
public void setHead(MyHead head) {
this .head = head;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this .content = content;
}
@Override
public String toString() {
return String.format( "[length=%d,version=%d,content=%s]" ,head.getLength(),head.getVersion(),content);
}
} |
编码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
/** * Created by Administrator on 17-1-9.
* 编码器 将自定义消息转化成ByteBuff
*/
public class MyEncoder extends MessageToByteEncoder<MyMessage> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MyMessage myMessage, ByteBuf byteBuf) throws Exception {
int length = myMessage.getHead().getLength();
int version = myMessage.getHead().getVersion();
String content = myMessage.getContent();
byteBuf.writeInt(length);
byteBuf.writeInt(version);
byteBuf.writeBytes(content.getBytes(Charset.forName( "UTF-8" )));
}
} |
解码器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
/** * Created by Administrator on 17-1-9.
* 解码器 将ByteBuf数据转化成自定义消息
*/
public class MyDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
int length = byteBuf.readInt();
int version = byteBuf.readInt();
byte [] body = new byte [length];
byteBuf.readBytes(body);
String content = new String(body, Charset.forName( "UTF-8" ));
MyMessage myMessage = new MyMessage( new MyHead(length,version),content);
list.add(myMessage);
}
} |
Server启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
public class Main {
public static void main(String[] args) {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup(); // (2)
int port = 8867 ;
try {
ServerBootstrap b = new ServerBootstrap(); // (3)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel. class ) // (4)
.childHandler( new ChannelInitializer<SocketChannel>() { // (5)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast( new MyEncoder())
.addLast( new MyDecoder())
.addLast( new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128 ) // (6)
.childOption(ChannelOption.SO_KEEPALIVE, true ); // (7)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (8)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
System.out.println( "start server...." );
f.channel().closeFuture().sync();
System.out.println( "stop server...." );
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println( "exit server...." );
}
}
} |
Server Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
|
public class ServerHandler extends ChannelHandlerAdapter {
//每当从客户端收到新的数据时,这个方法会在收到消息时被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MyMessage in = (MyMessage) msg;
try {
// Do something with msg
System.out.println( "server get :" + in);
} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}
}
//exceptionCaught()事件处理方法是当出现Throwable对象才会被调用
//当Netty由于IO错误或者处理器在处理事件时抛出的异常时
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
} |
Client启动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
public class Client {
public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
b.group(group)
.channel(NioSocketChannel. class )
.handler( new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast( new MyDecoder());
p.addLast( new MyEncoder());
p.addLast( new ClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect( "127.0.0.1" , 8867 ).sync();
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
} |
Client Handler
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
public class ClientHandler extends ChannelHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush( new MyMessage( new MyHead( "abcd" .getBytes( "UTF-8" ).length, 1 ), "abcd" ));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf in = (ByteBuf) msg;
try {
// Do something with msg
System.out.println( "client get :" + in.toString(CharsetUtil.UTF_8));
ctx.close();
} finally {
//ByteBuf是一个引用计数对象,这个对象必须显示地调用release()方法来释放
//or ((ByteBuf)msg).release();
ReferenceCountUtil.release(msg);
}
}
} |
运行结果
到这里,你会发现Netty处理TCP拆包、粘包问题很简单,通过编解码技术支持,让我们编写自定义协议也很方便,在后续的Netty博客中,我将继续为大家介绍Netty在实际中的一些应用(比如实现心跳检测),See You~
本文转自zfz_linux_boy 51CTO博客,原文链接:http://blog.51cto.com/zhangfengzhe/1890577,如需转载请自行联系原作者