Google ProtoBuf用法

一、简介

Google Protobuf是Netty中常用的编解码工具,Protobuf支持数据结构化一次可以到处使用,且可以跨语言使用,通过代码生成工具可以生成不同语言版本的源代码,还可以在使用不同版本的数据结构进程间进行数据传递,实现数据结构向前兼容。这个序列化框架相对于Java本身的序列化API具有性能高,适合不同语言开发的程序之间传输数据。

二、环境搭建

1、软件包下载

使用Protobuf需要下载Windows版本,github网址如下:

https://github.com/protocolbuffers/protobuf/releases/tag/v2.5.0

有条件的可以到Google官网下载。

如果是生成Java语言版本的代码引用到了protobuf-java-2.5.0.jar包里的类,maven地址如下:

https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java/2.5.0

将下载的protobuf-java-2.5.0.jar复制到项目的lib目录下,然后通过Build Path引入jar包。

2、通过protoc.exe生成java文件

解压下载的protoc-2.5.0-win32.zip文件,里面包含两个文件:

Google ProtoBuf用法

protoc.exe主要根据.proto文件生成代码,比如下面的SubscribeReq.proto和SubscribeResp.proto文件:

package netty;
option java_package="com.test.netty.chapter08";
option java_outer_classname="SubscribeReqProto";
message SubscribeReq{
	required int32 subReqID=1;
	required string userName=2;
	required string productName=3;
	required string address=4;
}
package netty;
option java_package="com.test.netty.chapter08";
option java_outer_classname="SubscribeRespProto";
message SubscribeResp{
	required int32 subReqID=1;
	required string respCode=2;
	required string desc=3;
}

在命令行界面通过protoc.exe生成java代码:

Google ProtoBuf用法

输出目录了protoc.exe所在目录下的src文件夹下,源文件就是相应的.proto文件。执行之后可以在src目录下看到生成了指定的包结构。

Google ProtoBuf用法

将里面的java文件复制到项目的目录下。如果项目没有导入protobuf-java-2.5.0.jar包会报错。

3、ProtoBuf的编解码使用方法

import java.util.ArrayList;
import java.util.List;

import com.google.protobuf.InvalidProtocolBufferException;

/**
 * ProtoBuf编解码使用方法
 */
public class TestSubscribeReqProto
{
    //将SubscribeReq序列化
    private static byte[] encode(SubscribeReqProto.SubscribeReq req){
        return req.toByteArray();
    }
    
    //将SubscribeReq反序列化
    private static SubscribeReqProto.SubscribeReq decode(byte[] body)
    throws InvalidProtocolBufferException{
        return SubscribeReqProto.SubscribeReq.parseFrom(body);
    }
    
    //创建SubscribeReq对象
    private static SubscribeReqProto.SubscribeReq createSubscribeReq(){
        SubscribeReqProto.SubscribeReq.Builder builder=SubscribeReqProto.
            SubscribeReq.newBuilder();
        builder.setSubReqID(1);
        builder.setUserName("test");
        builder.setProductName("Netty book");
        List<String> address=new ArrayList<>();
        address.add("NanJing XuanWuHu");
        address.add("BeiJing *");
        address.add("HangZhou XiHu");
        builder.setAddress("HangZhou XiHu");
        return builder.build();
    }
    
    public static void main(String[] args)
    throws InvalidProtocolBufferException
    {
        SubscribeReqProto.SubscribeReq req=createSubscribeReq();
        System.out.println("Before encode : "+req.toString());
        SubscribeReqProto.SubscribeReq req2=decode(encode(req));
        System.out.println("After decode : "+req.toString());
        System.out.println("Assert equal : -->"+req2.equals(req));
    }
}

Google ProtoBuf用法

三、Netty使用Protobuf进行编解码

Netty服务端代码如下:

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class SubReqServer
{
    public void bind(int port)throws Exception{
        EventLoopGroup bossGroup=new NioEventLoopGroup();
        EventLoopGroup workerGroup=new NioEventLoopGroup();
        try
        {
            ServerBootstrap b=new ServerBootstrap();
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .option(ChannelOption.SO_BACKLOG, 100)
            .handler(new LoggingHandler(LogLevel.INFO))
            .childHandler(new ChannelInitializer<SocketChannel>()
            {
                @Override
                protected void initChannel(SocketChannel ch)
                    throws Exception
                {
                    ch.pipeline().addLast(
                        new ProtobufVarint32FrameDecoder());
                    ch.pipeline().addLast(
                        new ProtobufDecoder(
                            SubscribeReqProto.SubscribeReq.getDefaultInstance()));
                    ch.pipeline().addLast(
                        new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(
                        new ProtobufEncoder());
                    ch.pipeline().addLast(new SubReqServerHandler());
                }
            });
            ChannelFuture f=b.bind(port).sync();
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally{
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
    
    public static void main(String[] args)throws Exception
    {
        int port =8888;
        if (args!=null&&args.length>0)
        {
            port=Integer.valueOf(args[0]);
        }
        new SubReqServer().bind(port);
    }
}

服务端网络I/O事件处理类:

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqServerHandler extends ChannelHandlerAdapter
{
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg)throws Exception{
        SubscribeReqProto.SubscribeReq req=(SubscribeReqProto.SubscribeReq)msg;
        if ("test".equalsIgnoreCase(req.getUserName()))
        {
            System.out.println("Service accept client subscribe req : ["+req.toString()+"]");
        }
        ctx.writeAndFlush(resp(req.getSubReqID()));
    }
    
    private SubscribeRespProto.SubscribeResp resp(int subReqID){
        SubscribeRespProto.SubscribeResp.Builder builder=
            SubscribeRespProto.SubscribeResp.newBuilder();
        builder.setSubReqID(subReqID);
        builder.setRespCode("0");
        builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address");
        return builder.build();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        cause.printStackTrace();
        ctx.close();
    }
}

Netty客户端代码:

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.protobuf.ProtobufDecoder;
import io.netty.handler.codec.protobuf.ProtobufEncoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;

public class SubReqClient
{
    public void connect(int port,String host)throws Exception{
        EventLoopGroup g=new NioEventLoopGroup();
        try
        {
            Bootstrap b=new Bootstrap();
            b.group(g)
            .channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>()
            {

                @Override
                protected void initChannel(SocketChannel ch)
                    throws Exception
                {
                    ch.pipeline().addLast(
                        new ProtobufVarint32FrameDecoder());
                    ch.pipeline().addLast(
                        new ProtobufDecoder(
                            SubscribeRespProto.SubscribeResp.getDefaultInstance()));
                    ch.pipeline().addLast(
                        new ProtobufVarint32LengthFieldPrepender());
                    ch.pipeline().addLast(
                        new ProtobufEncoder());
                    ch.pipeline().addLast(new SubReqClientHandler());
                }
            });
            ChannelFuture f=b.connect(host, port).sync();
            f.channel().closeFuture().sync();
        }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally{
            g.shutdownGracefully();
        }
    }
    
    public static void main(String[] args)throws Exception
    {
        int port =8888;
        if (args!=null&&args.length>0)
        {
            port=Integer.valueOf(args[0]);
        }
        new SubReqClient().connect(port, "127.0.0.1");
    }
}

客户端网络I/O事件处理类

import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;

public class SubReqClientHandler extends ChannelHandlerAdapter
{
    public SubReqClientHandler(){
        
    }
    
    @Override
    public void channelActive(ChannelHandlerContext ctx){
        for (int i = 0; i < 10; i++ )
        {
            ctx.write(subReq(i));
        }
        ctx.flush();
    }
    
    private SubscribeReqProto.SubscribeReq subReq(int i){
        SubscribeReqProto.SubscribeReq.Builder builder=
            SubscribeReqProto.SubscribeReq.newBuilder();
        builder.setSubReqID(i);
        builder.setUserName("test");
        builder.setProductName("Netty 权威指南");
        builder.setAddress("南京市江宁区牛首山风景区");
        return builder.build();
    }
    
    @Override
    public void channelRead(ChannelHandlerContext ctx,Object msg){
        System.out.println("Receive server response : ["+msg+"]");
    }
    
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{
        ctx.flush();
    }
    
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){
        cause.printStackTrace();
        ctx.close();
    }
}

Google ProtoBuf用法

Protobuf仅仅负责解码,它不支持读半包,所以需要在ProtobufDecoder前面一定要有能够处理半包的解码器,有三种方式可以选择:

1)使用Netty提供的ProtobufVarint32FrameDecoder,它可以处理半包;

2)继承Netty提供的通用半包解码器LengthFieldBasedFrameDecoder;

3)继承ByteToMessageDecoder类,自己处理半包消息。

参考文献:《Netty权威指南》