Google ProtoBuf用法
一、简介
Google Protobuf是Netty中常用的编解码工具,Protobuf支持数据结构化一次可以到处使用,且可以跨语言使用,通过代码生成工具可以生成不同语言版本的源代码,还可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构向前兼容。这个序列化框架相对于Java本身的序列化API具有性能高,适合不同语言开发的程序之间传输数据。
二、环境搭建
1、软件包下载
使用Protobuf需要下载Windows版本,github网址如下:
https://github.com/protocolbuffers/protobuf/releases/tag/v2.5.0
有条件的可以到Google官网下载。
如果是生成Java语言版本的代码引用到了protobuf-java-2.5.0.jar包里的类,maven地址如下:
https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/2.5.0
将下载的protobuf-java-2.5.0.jar复制到项目的lib目录下,然后通过Build Path引入jar包。
2、通过protoc.exe生成java文件
解压下载的protoc-2.5.0-win32.zip文件,里面包含两个文件:
protoc.exe主要根据.proto文件生成代码,比如下面的SubscribeReq.proto和SubscribeResp.proto文件:
package netty;
option java_package="com.test.netty.chapter08";
option java_outer_classname="SubscribeReqProto";
message SubscribeReq{
required int32 subReqID=1;
required string userName=2;
required string productName=3;
required string address=4;
}
package netty;
option java_package="com.test.netty.chapter08";
option java_outer_classname="SubscribeRespProto";
message SubscribeResp{
required int32 subReqID=1;
required string respCode=2;
required string desc=3;
}
在命令行界面通过protoc.exe生成java代码:
输出目录了protoc.exe所在目录下的src文件夹下,源文件就是相应的.proto文件。执行之后可以在src目录下看到生成了指定的包结构。
将里面的java文件复制到项目的目录下。如果项目没有导入protobuf-java-2.5.0.jar包会报错。
3、ProtoBuf的编解码使用方法
import java.util.ArrayList;
import java.util.List;
import com.google.protobuf.InvalidProtocolBufferException;
/**
* ProtoBuf编解码使用方法
*/
public class TestSubscribeReqProto
{
//将SubscribeReq序列化
private static byte[] encode(SubscribeReqProto.SubscribeReq req){
return req.toByteArray();
}
//将SubscribeReq反序列化
private static SubscribeReqProto.SubscribeReq decode(byte[] body)
throws InvalidProtocolBufferException{
return SubscribeReqProto.SubscribeReq.parseFrom(body);
}
//创建SubscribeReq对象
private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.
SubscribeReq.newBuilder();
builder.setSubReqID(1);
builder.setUserName("test");
builder.setProductName("Netty book");
List<String> address=new ArrayList<>();
address.add("NanJing XuanWuHu");
address.add("BeiJing *");
address.add("HangZhou XiHu");
builder.setAddress("HangZhou XiHu");
return builder.build();
}
public static void main(String[] args)
throws InvalidProtocolBufferException
{
SubscribeReqProto.SubscribeReq req=createSubscribeReq();
System.out.println("Before encode : "+req.toString());
SubscribeReqProto.SubscribeReq req2=decode(encode(req));
System.out.println("After decode : "+req.toString());
System.out.println("Assert equal : -->"+req2.equals(req));
}
}
三、Netty使用Protobuf进行编解码
Netty服务端代码如下:
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
public class SubReqServer
{
public void bind(int port)throws Exception{
EventLoopGroup bossGroup=new NioEventLoopGroup();
EventLoopGroup workerGroup=new NioEventLoopGroup();
try
{
ServerBootstrap b=new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
throws Exception
{
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeReqProto.SubscribeReq.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(
new ProtobufEncoder());
ch.pipeline().addLast(new SubReqServerHandler());
}
});
ChannelFuture f=b.bind(port).sync();
f.channel().closeFuture().sync();
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception
{
int port =8888;
if (args!=null&&args.length>0)
{
port=Integer.valueOf(args[0]);
}
new SubReqServer().bind(port);
}
}
服务端网络I/O事件处理类:
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqServerHandler extends ChannelHandlerAdapter
{
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
SubscribeReqProto.SubscribeReq req=(SubscribeReqProto.SubscribeReq)msg;
if ("test".equalsIgnoreCase(req.getUserName()))
{
System.out.println("Service accept client subscribe req : ["+req.toString()+"]");
}
ctx.writeAndFlush(resp(req.getSubReqID()));
}
private SubscribeRespProto.SubscribeResp resp(int subReqID){
SubscribeRespProto.SubscribeResp.Builder builder=
SubscribeRespProto.SubscribeResp.newBuilder();
builder.setSubReqID(subReqID);
builder.setRespCode("0");
builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
return builder.build();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
Netty客户端代码:
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
public class SubReqClient
{
public void connect(int port,String host)throws Exception{
EventLoopGroup g=new NioEventLoopGroup();
try
{
Bootstrap b=new Bootstrap();
b.group(g)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
throws Exception
{
ch.pipeline().addLast(
new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(
new ProtobufDecoder(
SubscribeRespProto.SubscribeResp.getDefaultInstance()));
ch.pipeline().addLast(
new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(
new ProtobufEncoder());
ch.pipeline().addLast(new SubReqClientHandler());
}
});
ChannelFuture f=b.connect(host, port).sync();
f.channel().closeFuture().sync();
}
catch (Exception e)
{
e.printStackTrace();
}
finally{
g.shutdownGracefully();
}
}
public static void main(String[] args)throws Exception
{
int port =8888;
if (args!=null&&args.length>0)
{
port=Integer.valueOf(args[0]);
}
new SubReqClient().connect(port, "127.0.0.1");
}
}
客户端网络I/O事件处理类
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class SubReqClientHandler extends ChannelHandlerAdapter
{
public SubReqClientHandler(){
}
@Override
public void channelActive(ChannelHandlerContext ctx){
for (int i = 0; i < 10; i++ )
{
ctx.write(subReq(i));
}
ctx.flush();
}
private SubscribeReqProto.SubscribeReq subReq(int i){
SubscribeReqProto.SubscribeReq.Builder builder=
SubscribeReqProto.SubscribeReq.newBuilder();
builder.setSubReqID(i);
builder.setUserName("test");
builder.setProductName("Netty 权威指南");
builder.setAddress("南京市江宁区牛首山风景区");
return builder.build();
}
@Override
public void channelRead(ChannelHandlerContext ctx,Object msg){
System.out.println("Receive server response : ["+msg+"]");
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
cause.printStackTrace();
ctx.close();
}
}
Protobuf仅仅负责解码,它不支持读半包,所以需要在ProtobufDecoder前面一定要有能够处理半包的解码器,有三种方式可以选择:
1)使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包;
2)继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;
3)继承ByteToMessageDecoder类,自己处理半包消息。
参考文献:《Netty权威指南》