使用Protocol Buffers入门四步骤
Protocol Buffers(简称protobuf)是谷歌的一项技术,用于将结构化的数据序列化、反序列化,经常用于网络传输。
protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化,即实现从结构体转换为字节流(编码,向LIS发送消息时使用)以及从字节流转换为结构体(解码,从LIS接收消息时使用)的功能。),一般应用于网络传输,可支持多种编程语言。
这货实际上类似于XML生成和解析,但protobuf的效率高于XML,不过protobuf生成的是字节码,可读性比XML差。类似的还有json、Java的Serializable等。
protobuf支持各种语言。本文以Java为例,简单介绍protobuf如何使用。其他语言使用方法类似。
首先需要下载:
http://download.****.net/download/xiao__gui/7586617
解压后有两个文件:protobuf-java-2.5.0.jar和protoc.exe。
protobuf-java-2.5.0.jar即protobuf所需要的jar包,如果用maven的话可以无视这个文件;
protoc.exe是protobuf代码生成工具。
第一步:定义数据结构
首先要定义protobuf的数据结构,这里要写一个.proto文件。这个文件有点类似于定义一个类。例如定义一个Person,保存文件PersonMsg.proto(注意文件名和里面的message名不要一样)。
- message Person {
- // ID(必需)
- required int32 id = 1;
- // 姓名(必需)
- required string name = 2;
- // email(可选)
- optional string email = 3;
- // 朋友(集合)
- repeated string friends = 4;
- }
上面的1、2、3、4是unique numbered tag,是一个唯一标识。
上面的例子中定义了一个非常简单的数据结构,当然还可以定义更复杂的结构,这里不再讨论,具体可以看官方文档。
第二步:protoc.exe生成Java代码
使用文件protoc.exe,cmd命令行运行:
protoc.exe --java_out=E:\Java PersonMsg.proto
输入文件是PersonMsg.proto,也就是定义数据结构的文件;输出文件夹是E:\java,将java文件生成在E:\java中。运行命令成功后会生成PersonMsg.java:
在Eclipse中创建一个项目,将java文件拷贝到项目中。项目中需要引入protobuf-java-2.5.0.jar包。如果是maven项目则加入:
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>2.5.0</version>
- </dependency>
第三步:序列化
第四步:反序列化
一般来说,序列化和反序列化是分开的。例如网络传输,由一方将数据序列化后发送给另一方来接收并解析,序列化发送和接收反序列化并不在一起。但是下面为了例子简单将二者写在同一程序中。
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import java.util.List;
- public class Main {
- public static void main(String[] args) throws IOException {
- // 按照定义的数据结构,创建一个Person
- PersonMsg.Person.Builder personBuilder = PersonMsg.Person.newBuilder();
- personBuilder.setId(1);
- personBuilder.setName("叉叉哥");
- personBuilder.setEmail("[email protected]");
- personBuilder.addFriends("Friend A");
- personBuilder.addFriends("Friend B");
- PersonMsg.Person xxg = personBuilder.build();
- // 将数据写到输出流,如网络输出流,这里就用ByteArrayOutputStream来代替
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- xxg.writeTo(output);
- // -------------- 分割线:上面是发送方,将数据序列化后发送 ---------------
- byte[] byteArray = output.toByteArray();
- // -------------- 分割线:下面是接收方,将数据接收后反序列化 ---------------
- // 接收到流并读取,如网络输入流,这里用ByteArrayInputStream来代替
- ByteArrayInputStream input = new ByteArrayInputStream(byteArray);
- // 反序列化
- PersonMsg.Person xxg2 = PersonMsg.Person.parseFrom(input);
- System.out.println("ID:" + xxg2.getId());
- System.out.println("name:" + xxg2.getName());
- System.out.println("email:" + xxg2.getEmail());
- System.out.println("friend:");
- List<String> friends = xxg2.getFriendsList();
- for(String friend : friends) {
- System.out.println(friend);
- }
- }
- }
作者:叉叉哥 转载请注明出处:http://blog.****.net/xiao__gui/article/details/36643949
protobuf是谷歌的Protocol Buffers的简称,用于结构化数据和字节码之间互相转换(序列化、反序列化),一般应用于网络传输,可支持多种编程语言。
protobuf如何使用这里不再介绍,本文主要介绍在MINA、Netty、Twisted中如何使用protobuf,不了解protobuf的同学可以去参考我的另一篇博文。
在前面的一篇博文中,有介绍到一种用一个固定为4字节的前缀Header来指定Body的字节数的一种消息分割方式,在这里同样要使用到。只是其中Body的内容不再是字符串,而是protobuf字节码。
在处理业务逻辑时,肯定不希望还要对数据进行序列化和反序列化,而是希望直接操作一个对象,那么就需要有相应的编码器和解码器,将序列化和反序列化的逻辑写在编码器和解码器中。有关编码器和解码器的实现,上一篇博文中有介绍。
Netty包中已经自带针对protobuf的编码器和解码器,那么就不用再自己去实现了。而MINA、Twisted还需要自己去实现protobuf的编码器和解码器。
这里定义一个protobuf数据结构,用于描述一个学生的信息,保存为StudentMsg.proto文件:
- message Student {
- // ID
- required int32 id = 1;
- // 姓名
- required string name = 2;
- optional string email = 3;
- // 朋友
- repeated string friends = 4;
- }
用StudentMsg.proto分别生成Java和Python代码,将代码加入到相应的项目中。生成的代码就不再贴上来了。
下面分别介绍在Netty、MINA、Twisted如何使用protobuf来传输Student信息。
Netty:
Netty自带protobuf的编码器和解码器,分别是ProtobufEncoder和ProtobufDecoder。需要注意的是,ProtobufEncoder和ProtobufDecoder只负责protobuf的序列化和反序列化,而处理消息Header前缀和消息分割的还需要LengthFieldBasedFrameDecoder和LengthFieldPrepender。LengthFieldBasedFrameDecoder即用于解析消息Header前缀,根据Header中指定的Body字节数截取Body,LengthFieldPrepender用于在wirte消息时在消息前面添加一个Header前缀来指定Body字节数。
- public class TcpServer {
- public static void main(String[] args) throws InterruptedException {
- EventLoopGroup bossGroup = new NioEventLoopGroup();
- EventLoopGroup workerGroup = new NioEventLoopGroup();
- try {
- ServerBootstrap b = new ServerBootstrap();
- b.group(bossGroup, workerGroup)
- .channel(NioServerSocketChannel.class)
- .childHandler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch)
- throws Exception {
- ChannelPipeline pipeline = ch.pipeline();
- // 负责通过4字节Header指定的Body长度将消息切割
- pipeline.addLast("frameDecoder",
- new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
- // 负责将frameDecoder处理后的完整的一条消息的protobuf字节码转成Student对象
- pipeline.addLast("protobufDecoder",
- new ProtobufDecoder(StudentMsg.Student.getDefaultInstance()));
- // 负责将写入的字节码加上4字节Header前缀来指定Body长度
- pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
- // 负责将Student对象转成protobuf字节码
- pipeline.addLast("protobufEncoder", new ProtobufEncoder());
- pipeline.addLast(new TcpServerHandler());
- }
- });
- ChannelFuture f = b.bind(8080).sync();
- f.channel().closeFuture().sync();
- } finally {
- workerGroup.shutdownGracefully();
- bossGroup.shutdownGracefully();
- }
- }
- }
处理事件时,接收和发送的参数直接就是Student对象:
- public class TcpServerHandler extends ChannelInboundHandlerAdapter {
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) {
- // 读取客户端传过来的Student对象
- StudentMsg.Student student = (StudentMsg.Student) msg;
- System.out.println("ID:" + student.getId());
- System.out.println("Name:" + student.getName());
- System.out.println("Email:" + student.getEmail());
- System.out.println("Friends:");
- List<String> friends = student.getFriendsList();
- for(String friend : friends) {
- System.out.println(friend);
- }
- // 新建一个Student对象传到客户端
- StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
- builder.setId(9);
- builder.setName("服务器");
- builder.setEmail("[email protected]");
- builder.addFriends("X");
- builder.addFriends("Y");
- StudentMsg.Student student2 = builder.build();
- ctx.writeAndFlush(student2);
- }
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
- cause.printStackTrace();
- ctx.close();
- }
- }
MINA:
在MINA中没有针对protobuf的编码器和解码器,但是可以自己实现一个功能和Netty一样的编码器和解码器。
编码器:
- public class MinaProtobufEncoder extends ProtocolEncoderAdapter {
- @Override
- public void encode(IoSession session, Object message,
- ProtocolEncoderOutput out) throws Exception {
- StudentMsg.Student student = (StudentMsg.Student) message;
- byte[] bytes = student.toByteArray(); // Student对象转为protobuf字节码
- int length = bytes.length;
- IoBuffer buffer = IoBuffer.allocate(length + 4);
- buffer.putInt(length); // write header
- buffer.put(bytes); // write body
- buffer.flip();
- out.write(buffer);
- }
- }
解码器:
- public class MinaProtobufDecoder extends CumulativeProtocolDecoder {
- @Override
- protected boolean doDecode(IoSession session, IoBuffer in,
- ProtocolDecoderOutput out) throws Exception {
- // 如果没有接收完Header部分(4字节),直接返回false
- if (in.remaining() < 4) {
- return false;
- } else {
- // 标记开始位置,如果一条消息没传输完成则返回到这个位置
- in.mark();
- // 读取header部分,获取body长度
- int bodyLength = in.getInt();
- // 如果body没有接收完整,直接返回false
- if (in.remaining() < bodyLength) {
- in.reset(); // IoBuffer position回到原来标记的地方
- return false;
- } else {
- byte[] bodyBytes = new byte[bodyLength];
- in.get(bodyBytes); // 读取body部分
- StudentMsg.Student student = StudentMsg.Student.parseFrom(bodyBytes); // 将body中protobuf字节码转成Student对象
- out.write(student); // 解析出一条消息
- return true;
- }
- }
- }
- }
MINA服务器加入protobuf的编码器和解码器:
- public class TcpServer {
- public static void main(String[] args) throws IOException {
- IoAcceptor acceptor = new NioSocketAcceptor();
- // 指定protobuf的编码器和解码器
- acceptor.getFilterChain().addLast("codec",
- new ProtocolCodecFilter(new MinaProtobufEncoder(), new MinaProtobufDecoder()));
- acceptor.setHandler(new TcpServerHandle());
- acceptor.bind(new InetSocketAddress(8080));
- }
- }
这样,在处理业务逻辑时,就和Netty一样了:
- public class TcpServerHandle extends IoHandlerAdapter {
- @Override
- public void exceptionCaught(IoSession session, Throwable cause)
- throws Exception {
- cause.printStackTrace();
- }
- @Override
- public void messageReceived(IoSession session, Object message)
- throws Exception {
- // 读取客户端传过来的Student对象
- StudentMsg.Student student = (StudentMsg.Student) message;
- System.out.println("ID:" + student.getId());
- System.out.println("Name:" + student.getName());
- System.out.println("Email:" + student.getEmail());
- System.out.println("Friends:");
- List<String> friends = student.getFriendsList();
- for(String friend : friends) {
- System.out.println(friend);
- }
- // 新建一个Student对象传到客户端
- StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
- builder.setId(9);
- builder.setName("服务器");
- builder.setEmail("[email protected]");
- builder.addFriends("X");
- builder.addFriends("Y");
- StudentMsg.Student student2 = builder.build();
- session.write(student2);
- }
- }
Twisted:
在Twisted中,首先定义一个ProtobufProtocol类,继承Protocol类,充当编码器和解码器。处理业务逻辑的TcpServerHandle类再继承ProtobufProtocol类,调用或重写ProtobufProtocol提供的方法。
- # -*- coding:utf-8 –*-
- from struct import pack, unpack
- from twisted.internet.protocol import Factory
- from twisted.internet.protocol import Protocol
- from twisted.internet import reactor
- import StudentMsg_pb2
- # protobuf编码、解码器
- class ProtobufProtocol(Protocol):
- # 用于暂时存放接收到的数据
- _buffer = b""
- def dataReceived(self, data):
- # 上次未处理的数据加上本次接收到的数据
- self._buffer = self._buffer + data
- # 一直循环直到新的消息没有接收完整
- while True:
- # 如果header接收完整
- if len(self._buffer) >= 4:
- # header部分,按大字节序转int,获取body长度
- length, = unpack(">I", self._buffer[0:4])
- # 如果body接收完整
- if len(self._buffer) >= 4 + length:
- # body部分,protobuf字节码
- packet = self._buffer[4:4 + length]
- # protobuf字节码转成Student对象
- student = StudentMsg_pb2.Student()
- student.ParseFromString(packet)
- # 调用protobufReceived传入Student对象
- self.protobufReceived(student)
- # 去掉_buffer中已经处理的消息部分
- self._buffer = self._buffer[4 + length:]
- else:
- break;
- else:
- break;
- def protobufReceived(self, student):
- raise NotImplementedError
- def sendProtobuf(self, student):
- # Student对象转为protobuf字节码
- data = student.SerializeToString()
- # 添加Header前缀指定protobuf字节码长度
- self.transport.write(pack(">I", len(data)) + data)
- # 逻辑代码
- class TcpServerHandle(ProtobufProtocol):
- # 实现ProtobufProtocol提供的protobufReceived
- def protobufReceived(self, student):
- # 将接收到的Student输出
- print 'ID:' + str(student.id)
- print 'Name:' + student.name
- print 'Email:' + student.email
- print 'Friends:'
- for friend in student.friends:
- print friend
- # 创建一个Student并发送给客户端
- student2 = StudentMsg_pb2.Student()
- student2.id = 9
- student2.name = '服务器'.decode('UTF-8') # 中文需要转成UTF-8字符串
- student2.email = '[email protected]'
- student2.friends.append('X')
- student2.friends.append('Y')
- self.sendProtobuf(student2)
- factory = Factory()
- factory.protocol = TcpServerHandle
- reactor.listenTCP(8080, factory)
- reactor.run()
下面是Java编写的一个客户端测试程序:
- public class TcpClient {
- public static void main(String[] args) throws IOException {
- Socket socket = null;
- DataOutputStream out = null;
- DataInputStream in = null;
- try {
- socket = new Socket("localhost", 8080);
- out = new DataOutputStream(socket.getOutputStream());
- in = new DataInputStream(socket.getInputStream());
- // 创建一个Student传给服务器
- StudentMsg.Student.Builder builder = StudentMsg.Student.newBuilder();
- builder.setId(1);
- builder.setName("客户端");
- builder.setEmail("[email protected]");
- builder.addFriends("A");
- builder.addFriends("B");
- StudentMsg.Student student = builder.build();
- byte[] outputBytes = student.toByteArray(); // Student转成字节码
- out.writeInt(outputBytes.length); // write header
- out.write(outputBytes); // write body
- out.flush();
- // 获取服务器传过来的Student
- int bodyLength = in.readInt(); // read header
- byte[] bodyBytes = new byte[bodyLength];
- in.readFully(bodyBytes); // read body
- StudentMsg.Student student2 = StudentMsg.Student.parseFrom(bodyBytes); // body字节码解析成Student
- System.out.println("Header:" + bodyLength);
- System.out.println("Body:");
- System.out.println("ID:" + student2.getId());
- System.out.println("Name:" + student2.getName());
- System.out.println("Email:" + student2.getEmail());
- System.out.println("Friends:");
- List<String> friends = student2.getFriendsList();
- for(String friend : friends) {
- System.out.println(friend);
- }
- } finally {
- // 关闭连接
- in.close();
- out.close();
- socket.close();
- }
- }
- }
用客户端分别测试上面三个TCP服务器:
服务器输出:
ID:1
Name:客户端
Email:[email protected]
Friends:
A
B
客户端输出:
Header:32
Body:
ID:9
Name:服务器
Email:[email protected]
Friends:
X
Y
作者:叉叉哥 转载请注明出处:http://blog.****.net/xiao__gui/article/details/38864961