Netty之编码解码技术(四)
转:https://blog.****.net/qq_18603599/article/details/80768403
上一章介绍了netty编程中经常遇到的TCP的粘包和拆包的问题以及解决方案,今天介绍netty基于网络通信的编解码知识,主要有以下知识点.
1 编码器
2 解码器
3 netty自带的编解码器
4 代码实例演示常用的编解码器
5 自定义编解码器
首先接单介绍一下,序列化和反序列化大概念,在网络传输通信中,会发生两种数据转换的操作,一种是把消息对象转换成字节码,这种是序列化,还有一种是要把字节码对象再转换成消息对象,称为反序列化.和netty的对应关系,序列化对应的是编码过程,反序列化对应的解码过程,当然这种框架有很多,比如Kryo,json,protobuf等,今天主要学习一下netty里面的实现.下面就分开来说.
Encoder:编码器
& 消息对象编码成消息对象:MessageToMessageEncoder,netty的实现子类有如下所示:
Base64Encoder,ProtobufEncoder,RedisEncoder,StringEncoder
& 消息对象编码成字节MessageToByteEncoder,netty的实现子类如下所示:
MarshallingEncoder,ObjectEncoder
Decoder:解码器
& 解码字节到消息:ByteToMessageDecoder,对应的netty子类如下所示
DelimiterBasedFrameDecoder,FixedLengthFrameDecoder,LengthFieldBasedFrameDecoder,LineBasedFrameDecoder,RedisDecoder
& 解码消息到消息:MessageToMessageDecoder:对应的netty子类如下所示
Base64Decoder,ProtobufDecoder,StringDecoder
总结类关系
Encoder实际继承与ChannelOutboundHandlerAdapter,Decoder实际继承与ChannelInboundHandlerAdapter,后者其实都是继承与ChannelHandlerAdapter.上面的理论知识就介绍完了,下面直接通过代码演示一些常用的编解码器是如何使用的,分类讲解
使用Jboss Marshalling来实现编解码功能:模拟请求和响应之间的关系
首先看一下代码结构:
接下来分析每个类的代码,首先完成Jboss的编解码工厂实现,主要有两个方法,一个是解码,一个是编码
package encoder_decoder.masharlling; import io.netty.handler.codec.marshalling.DefaultMarshallerProvider; import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider; import io.netty.handler.codec.marshalling.MarshallerProvider; import io.netty.handler.codec.marshalling.MarshallingDecoder; import io.netty.handler.codec.marshalling.MarshallingEncoder; import io.netty.handler.codec.marshalling.UnmarshallerProvider; import org.jboss.marshalling.MarshallerFactory; import org.jboss.marshalling.Marshalling; import org.jboss.marshalling.MarshallingConfiguration; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 调用jboss marshalling实现对象的编码和解码功能 */ public class MarshallingCodeCFactory { /** * 创建Jboss Marshalling解码器MarshallingDecoder * * @return */ public static MarshallingDecoder buildMarshallingDecoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //解码提供者 UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration); //获取一个解码器 MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024); return decoder; } /** * 创建Jboss Marshalling编码器MarshallingEncoder * * @return */ public static MarshallingEncoder buildMarshallingEncoder() { final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial"); final MarshallingConfiguration configuration = new MarshallingConfiguration(); configuration.setVersion(5); //编码提供者 MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration); //获取一个编码器 MarshallingEncoder encoder = new MarshallingEncoder(provider); return encoder; } }
定义两个JavaBean用来存储用户的请求和响应实体信息:如下所示
package pojo; import java.io.Serializable; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 保存请求参数 */ public class RequestBean implements Serializable { /** * 默认的***ID */ private static final long serialVersionUID = 1L; private int subReqID; private String userName; private String productName; private String phoneNumber; private String address; /** * @return the subReqID */ public final int getSubReqID() { return subReqID; } /** * @param subReqID * the subReqID to set */ public final void setSubReqID(int subReqID) { this.subReqID = subReqID; } /** * @return the userName */ public final String getUserName() { return userName; } /** * @param userName * the userName to set */ public final void setUserName(String userName) { this.userName = userName; } /** * @return the productName */ public final String getProductName() { return productName; } /** * @param productName * the productName to set */ public final void setProductName(String productName) { this.productName = productName; } /** * @return the phoneNumber */ public final String getPhoneNumber() { return phoneNumber; } /** * @param phoneNumber * the phoneNumber to set */ public final void setPhoneNumber(String phoneNumber) { this.phoneNumber = phoneNumber; } /** * @return the address */ public final String getAddress() { return address; } /** * @param address * the address to set */ public final void setAddress(String address) { this.address = address; } @Override public String toString() { return "RequestBean [subReqID=" + subReqID + ", userName=" + userName + ", productName=" + productName + ", phoneNumber=" + phoneNumber + ", address=" + address + "]"; } }
保存响应信息:
package pojo; import java.io.Serializable; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 保存响应参数 */ public class ResponseBean implements Serializable { /** * 默认序列ID */ private static final long serialVersionUID = 1L; private int subReqID; private int respCode; private String desc; /** * @return the subReqID */ public final int getSubReqID() { return subReqID; } /** * @param subReqID * the subReqID to set */ public final void setSubReqID(int subReqID) { this.subReqID = subReqID; } /** * @return the respCode */ public final int getRespCode() { return respCode; } /** * @param respCode * the respCode to set */ public final void setRespCode(int respCode) { this.respCode = respCode; } /** * @return the desc */ public final String getDesc() { return desc; } /** * @param desc * the desc to set */ public final void setDesc(String desc) { this.desc = desc; } @Override public String toString() { return "ResponseBean [subReqID=" + subReqID + ", respCode=" + respCode + ", desc=" + desc + "]"; } }
定义业务处理的handler,循环向服务端发送请求对象信息,使用编码器来编码对象消息成字节码,同时获取服务端响应的信息,使用解码器来获取结果.
package encoder_decoder.masharlling.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import pojo.RequestBean; import java.util.Random; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 client handler */ public class RequestClientHandler extends ChannelInboundHandlerAdapter{ /** * 客户端连接上服务端之后会调用此方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i <5 ; i++) { System.out.println("Client really send data to Server:"+generatorRequestBean(i)); ctx.writeAndFlush(generatorRequestBean(i)); } } /** * 构建要发送给服务端的数据 * @param i * @return */ private RequestBean generatorRequestBean(int i){ String[] unames ={"jhp","bruce","lyb"}; RequestBean req = new RequestBean(); req.setAddress("南京市浦口区天润城2街区47栋1单元"); req.setPhoneNumber("13803456728"); req.setProductName("人工只能AI实践"); req.setSubReqID(i); req.setUserName(unames[new Random().nextInt(3)]); return req; } /** * 客户端读取服务端写入的数据 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(); System.out.println("Receive server response : [" + msg + "]"); } /** * 连接或者发送读取数据出现异常 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { super.exceptionCaught(ctx, cause); } }
使用客户端发送请求
package encoder_decoder.masharlling.client; import encoder_decoder.masharlling.MarshallingCodeCFactory; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 客户端 */ public class RequestClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //添加jboss的解码器 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); //添加Jboss的编码器 ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); ch.pipeline().addLast(new RequestClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { } } new RequestClient().connect(port, "127.0.0.1"); } }
编写服务端处理的handler,接受客户端的数据,并写回数据给客户端
package encoder_decoder.masharlling.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import pojo.RequestBean; import pojo.ResponseBean; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 服务端处理业务的handler */ public class RequestServerHandler extends ChannelInboundHandlerAdapter { /** * 服务端连接到客户端戳发该函数调用 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("服务端连接到客户度:"+ctx.channel().remoteAddress()); } /** * 服务端接收客户端的数据 并处理完之后写入处理结果给客户端 * @param ctx * @param msg * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestBean requestBean = (RequestBean)msg; if (requestBean.getUserName().equals("jhp")){ System.out.println("Service accept client subscrib req : [" + requestBean.toString() + "]"); ctx.writeAndFlush(genertorResponseBean(requestBean.getSubReqID())); } } /** * 构建响应信息 * @param id * @return */ private ResponseBean genertorResponseBean(int id){ ResponseBean resp = new ResponseBean(); resp.setSubReqID(id); resp.setRespCode(0); resp.setDesc("这本书籍是介绍和ai相关的实践项目集合"); return resp; } /** * 发生异常 打印信息 关闭所有链路 * @param ctx * @param cause * @throws Exception */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } }
再看服务端监听客户端并处理:
package encoder_decoder.masharlling.server; import encoder_decoder.masharlling.MarshallingCodeCFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @ 创建人 贾红平 * @ 创建时间 2018/6/24 * @ 功能描述 服务端 */ public class RequestServer { /** * 绑定端口号 进行业务处理 * @param port * @throws Exception */ public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); ch.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //ch.pipeline().addLast(new StringEncoder()); //ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new RequestServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new RequestServer().bind(port); } }
ok,看一下最终的效果:
服务端的效果:
客户端的效果:
请求和响应完全是OK,且内容也是正确的,代表编解码使用的没有问题。接下来在看第二种编解码使用
基于ObjectEncoder和ObjectDecoder的使用:
看一下代码结构:
因为很多内容都是一样的,这里就直接上代码并把有区别的地方用红色标注出来,其它就不多说了,首先看客户端的功能:
package encoder_decoder.searizable.client; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import pojo.RequestBean; /** * @author jiahp * @function 处理序列化的IO线程 */ public class SeariazableClientHandler extends ChannelInboundHandlerAdapter { public SeariazableClientHandler() { } public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 4; i++) { ctx.writeAndFlush(subReq(i)); } } private RequestBean subReq(int i) { RequestBean req = new RequestBean(); req.setAddress("南京市雨花台区软件大道101号华为基地"); req.setPhoneNumber("138xxxxxxxxx"); req.setProductName("Netty 最佳实践和原理分析"); req.setSubReqID(i); req.setUserName("JHP"); return req; } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客户端代码:
package encoder_decoder.searizable.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; /** * 处理client * @author jiahp */ public class SearizableClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //netty自带的解码实现 ch.pipeline().addLast(new ObjectDecoder(1024, ClassResolvers.cacheDisabled(this.getClass().getClassLoader()))); //netty自带的编码实现 ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SeariazableClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SearizableClient().connect(port, "127.0.0.1"); } }
再看服务端相关的:
package encoder_decoder.searizable.server; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import pojo.RequestBean; import pojo.ResponseBean; /** * @author jiahp * @function 处理服务端的IO线程 */ @Sharable public class SeariableServerHandler extends ChannelInboundHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestBean req = (RequestBean) msg; if ("JHP".equalsIgnoreCase(req.getUserName())) { System.out.println("Service accept client subscrib req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } private ResponseBean resp(int subReqID) { ResponseBean resp = new ResponseBean(); resp.setSubReqID(subReqID); resp.setRespCode(0); resp.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return resp; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
服务器端:
package encoder_decoder.searizable.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.serialization.ClassResolvers; import io.netty.handler.codec.serialization.ObjectDecoder; import io.netty.handler.codec.serialization.ObjectEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author jiahp * @function 处理服务端 */ public class SeariableServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 100) .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { //netty自带的解码实现 ch.pipeline().addLast(new ObjectDecoder(1024 * 1024, ClassResolvers.weakCachingConcurrentResolver(this.getClass().getClassLoader()))); //netty自带的编码实现 ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new SeariableServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new SeariableServer().bind(port); } }
OK,代码写完了 看一下效果如何:
客户端接受服务端返回的效果:
服务端接受客户端的数据并解析,写入结果:
第二种常用的也说完了,接下里要介绍的是第三种,很多大数据框架也用到了,比如hbase,hadoop等都会用到,就是
基于google protobuf的协议,只不过这个使用起来稍微复杂点,简单介绍一下大概步骤:
& 首先需要安装protobuf在电脑上,window和linux安装方式不一样,具体可以百度
& 安装好之后使用对用的生成程序生成指定的javabean对应的protobuf文件
& 程序中使用基于protobuf的文件获取java对象并使用:
直接上代码:
proto包就是根据javabean生成对应的Proto文件,就是请求和响应,直接看生成之后的代码
/** * */ package encoder_decoder.protobuf.proto; /** * 生成请求的protobuf */ public final class RequestProto { private RequestProto() {} public static void registerAllExtensions( com.google.protobuf.ExtensionRegistry registry) { } public interface RequestProtoOrBuilder extends com.google.protobuf.MessageOrBuilder { // required int32 subReqID = 1; /** * <code>required int32 subReqID = 1;</code> */ boolean hasSubReqID(); /** * <code>required int32 subReqID = 1;</code> */ int getSubReqID(); // required string userName = 2; /** * <code>required string userName = 2;</code> */ boolean hasUserName(); /** * <code>required string userName = 2;</code> */ String getUserName(); /** * <code>required string userName = 2;</code> */ com.google.protobuf.ByteString getUserNameBytes(); // required string productName = 3; /** * <code>required string productName = 3;</code> */ boolean hasProductName(); /** * <code>required string productName = 3;</code> */ String getProductName(); /** * <code>required string productName = 3;</code> */ com.google.protobuf.ByteString getProductNameBytes(); // repeated string address = 4; /** * <code>repeated string address = 4;</code> */ java.util.List<String> getAddressList(); /** * <code>repeated string address = 4;</code> */ int getAddressCount(); /** * <code>repeated string address = 4;</code> */ String getAddress(int index); /** * <code>repeated string address = 4;</code> */ com.google.protobuf.ByteString getAddressBytes(int index); } /** * Protobuf type {@code netty.SubscribeReq} */ public static final class RequestBean extends com.google.protobuf.GeneratedMessage implements RequestProtoOrBuilder { private RequestBean(com.google.protobuf.GeneratedMessage.Builder<?> builder) { super(builder); this.unknownFields = builder.getUnknownFields(); } private RequestBean(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } private static final RequestBean defaultInstance; public static RequestBean getDefaultInstance() { return defaultInstance; } public RequestBean getDefaultInstanceForType() { return defaultInstance; } private final com.google.protobuf.UnknownFieldSet unknownFields; @Override public final com.google.protobuf.UnknownFieldSet getUnknownFields() { return this.unknownFields; } private RequestBean( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { boolean done = false; while (!done) { int tag = input.readTag(); switch (tag) { case 0: done = true; break; default: { if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) { done = true; } break; } case 8: { bitField0_ |= 0x00000001; subReqID_ = input.readInt32(); break; } case 18: { bitField0_ |= 0x00000002; userName_ = input.readBytes(); break; } case 26: { bitField0_ |= 0x00000004; productName_ = input.readBytes(); break; } case 34: { if (!((mutable_bitField0_ & 0x00000008) == 0x00000008)) { address_ = new com.google.protobuf.LazyStringArrayList(); mutable_bitField0_ |= 0x00000008; } address_.add(input.readBytes()); break; } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { throw e.setUnfinishedMessage(this); } catch (java.io.IOException e) { throw new com.google.protobuf.InvalidProtocolBufferException( e.getMessage()).setUnfinishedMessage(this); } finally { if (((mutable_bitField0_ & 0x00000008) == 0x00000008)) { address_ = new com.google.protobuf.UnmodifiableLazyStringList(address_); } this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return RequestProto.internal_static_netty_SubscribeReq_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return RequestProto.internal_static_netty_SubscribeReq_fieldAccessorTable .ensureFieldAccessorsInitialized(RequestBean.class,RequestProto.RequestBean.Builder.class); } public static com.google.protobuf.Parser<RequestBean> PARSER = new com.google.protobuf.AbstractParser<RequestBean>() { public RequestBean parsePartialFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return new RequestBean(input, extensionRegistry); } }; @Override public com.google.protobuf.Parser<RequestBean> getParserForType() { return PARSER; } private int bitField0_; // required int32 subReqID = 1; public static final int SUBREQID_FIELD_NUMBER = 1; private int subReqID_; /** * <code>required int32 subReqID = 1;</code> */ public boolean hasSubReqID() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 subReqID = 1;</code> */ public int getSubReqID() { return subReqID_; } // required string userName = 2; public static final int USERNAME_FIELD_NUMBER = 2; private Object userName_; /** * <code>required string userName = 2;</code> */ public boolean hasUserName() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required string userName = 2;</code> */ public String getUserName() { Object ref = userName_; if (ref instanceof String) { return (String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { userName_ = s; } return s; } } /** * <code>required string userName = 2;</code> */ public com.google.protobuf.ByteString getUserNameBytes() { Object ref = userName_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); userName_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } // required string productName = 3; public static final int PRODUCTNAME_FIELD_NUMBER = 3; private Object productName_; /** * <code>required string productName = 3;</code> */ public boolean hasProductName() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>required string productName = 3;</code> */ public String getProductName() { Object ref = productName_; if (ref instanceof String) { return (String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { productName_ = s; } return s; } } /** * <code>required string productName = 3;</code> */ public com.google.protobuf.ByteString getProductNameBytes() { Object ref = productName_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); productName_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } // repeated string address = 4; public static final int ADDRESS_FIELD_NUMBER = 4; private com.google.protobuf.LazyStringList address_; /** * <code>repeated string address = 4;</code> */ public java.util.List<String> getAddressList() { return address_; } /** * <code>repeated string address = 4;</code> */ public int getAddressCount() { return address_.size(); } /** * <code>repeated string address = 4;</code> */ public String getAddress(int index) { return address_.get(index); } /** * <code>repeated string address = 4;</code> */ public com.google.protobuf.ByteString getAddressBytes(int index) { return address_.getByteString(index); } private void initFields() { subReqID_ = 0; userName_ = ""; productName_ = ""; address_ = com.google.protobuf.LazyStringArrayList.EMPTY; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; if (!hasSubReqID()) { memoizedIsInitialized = 0; return false; } if (!hasUserName()) { memoizedIsInitialized = 0; return false; } if (!hasProductName()) { memoizedIsInitialized = 0; return false; } memoizedIsInitialized = 1; return true; } public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeInt32(1, subReqID_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeBytes(2, getUserNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, getProductNameBytes()); } for (int i = 0; i < address_.size(); i++) { output.writeBytes(4, address_.getByteString(i)); } getUnknownFields().writeTo(output); } private int memoizedSerializedSize = -1; public int getSerializedSize() { int size = memoizedSerializedSize; if (size != -1) return size; size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream .computeInt32Size(1, subReqID_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(2, getUserNameBytes()); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream .computeBytesSize(3, getProductNameBytes()); } { int dataSize = 0; for (int i = 0; i < address_.size(); i++) { dataSize += com.google.protobuf.CodedOutputStream .computeBytesSizeNoTag(address_.getByteString(i)); } size += dataSize; size += 1 * getAddressList().size(); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } private static final long serialVersionUID = 0L; @Override protected Object writeReplace() throws java.io.ObjectStreamException { return super.writeReplace(); } public static RequestProto.RequestBean parseFrom( com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static RequestProto.RequestBean parseFrom( com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static RequestProto.RequestBean parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static RequestProto.RequestBean parseFrom( byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static RequestProto.RequestBean parseFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static RequestProto.RequestBean parseFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static RequestProto.RequestBean parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseDelimitedFrom(input); } public static RequestProto.RequestBean parseDelimitedFrom( java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseDelimitedFrom(input, extensionRegistry); } public static RequestProto.RequestBean parseFrom( com.google.protobuf.CodedInputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static RequestProto.RequestBean parseFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } public static Builder newBuilder(RequestProto.RequestBean prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } @Override protected Builder newBuilderForType( BuilderParent parent) { Builder builder = new Builder(parent); return builder; } /** * Protobuf type {@code netty.SubscribeReq} */ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder<Builder> implements RequestProto.RequestProtoOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return RequestProto.internal_static_netty_SubscribeReq_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return RequestProto.internal_static_netty_SubscribeReq_fieldAccessorTable .ensureFieldAccessorsInitialized( RequestProto.RequestBean.class, RequestProto.RequestBean.Builder.class); } // Construct using com.phei.netty.codec.protobuf.SubscribeReqProto.SubscribeReq.newBuilder() private Builder() { maybeForceBuilderInitialization(); } private Builder( BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { } } private static Builder create() { return new Builder(); } public Builder clear() { super.clear(); subReqID_ = 0; bitField0_ = (bitField0_ & ~0x00000001); userName_ = ""; bitField0_ = (bitField0_ & ~0x00000002); productName_ = ""; bitField0_ = (bitField0_ & ~0x00000004); address_ = com.google.protobuf.LazyStringArrayList.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); return this; } public Builder clone() { return create().mergeFrom(buildPartial()); } public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { return RequestProto.internal_static_netty_SubscribeReq_descriptor; } public RequestProto.RequestBean getDefaultInstanceForType() { return RequestProto.RequestBean.getDefaultInstance(); } public RequestProto.RequestBean build() { RequestProto.RequestBean result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } return result; } public RequestProto.RequestBean buildPartial() { RequestProto.RequestBean result = new RequestProto.RequestBean(this); int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } result.subReqID_ = subReqID_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } result.userName_ = userName_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } result.productName_ = productName_; if (((bitField0_ & 0x00000008) == 0x00000008)) { address_ = new com.google.protobuf.UnmodifiableLazyStringList( address_); bitField0_ = (bitField0_ & ~0x00000008); } result.address_ = address_; result.bitField0_ = to_bitField0_; onBuilt(); return result; } public Builder mergeFrom(com.google.protobuf.Message other) { if (other instanceof RequestProto.RequestBean) { return mergeFrom((RequestProto.RequestBean)other); } else { super.mergeFrom(other); return this; } } public Builder mergeFrom(RequestProto.RequestBean other) { if (other == RequestProto.RequestBean.getDefaultInstance()) return this; if (other.hasSubReqID()) { setSubReqID(other.getSubReqID()); } if (other.hasUserName()) { bitField0_ |= 0x00000002; userName_ = other.userName_; onChanged(); } if (other.hasProductName()) { bitField0_ |= 0x00000004; productName_ = other.productName_; onChanged(); } if (!other.address_.isEmpty()) { if (address_.isEmpty()) { address_ = other.address_; bitField0_ = (bitField0_ & ~0x00000008); } else { ensureAddressIsMutable(); address_.addAll(other.address_); } onChanged(); } this.mergeUnknownFields(other.getUnknownFields()); return this; } public final boolean isInitialized() { if (!hasSubReqID()) { return false; } if (!hasUserName()) { return false; } if (!hasProductName()) { return false; } return true; } public Builder mergeFrom( com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { RequestProto.RequestBean parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { parsedMessage = (RequestProto.RequestBean) e.getUnfinishedMessage(); throw e; } finally { if (parsedMessage != null) { mergeFrom(parsedMessage); } } return this; } private int bitField0_; // required int32 subReqID = 1; private int subReqID_ ; /** * <code>required int32 subReqID = 1;</code> */ public boolean hasSubReqID() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 subReqID = 1;</code> */ public int getSubReqID() { return subReqID_; } /** * <code>required int32 subReqID = 1;</code> */ public Builder setSubReqID(int value) { bitField0_ |= 0x00000001; subReqID_ = value; onChanged(); return this; } /** * <code>required int32 subReqID = 1;</code> */ public Builder clearSubReqID() { bitField0_ = (bitField0_ & ~0x00000001); subReqID_ = 0; onChanged(); return this; } // required string userName = 2; private Object userName_ = ""; /** * <code>required string userName = 2;</code> */ public boolean hasUserName() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required string userName = 2;</code> */ public String getUserName() { Object ref = userName_; if (!(ref instanceof String)) { String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); userName_ = s; return s; } else { return (String) ref; } } /** * <code>required string userName = 2;</code> */ public com.google.protobuf.ByteString getUserNameBytes() { Object ref = userName_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); userName_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** * <code>required string userName = 2;</code> */ public Builder setUserName( String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000002; userName_ = value; onChanged(); return this; } /** * <code>required string userName = 2;</code> */ public Builder clearUserName() { bitField0_ = (bitField0_ & ~0x00000002); userName_ = getDefaultInstance().getUserName(); onChanged(); return this; } /** * <code>required string userName = 2;</code> */ public Builder setUserNameBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000002; userName_ = value; onChanged(); return this; } // required string productName = 3; private Object productName_ = ""; /** * <code>required string productName = 3;</code> */ public boolean hasProductName() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>required string productName = 3;</code> */ public String getProductName() { Object ref = productName_; if (!(ref instanceof String)) { String s = ((com.google.protobuf.ByteString) ref) .toStringUtf8(); productName_ = s; return s; } else { return (String) ref; } } /** * <code>required string productName = 3;</code> */ public com.google.protobuf.ByteString getProductNameBytes() { Object ref = productName_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8( (String) ref); productName_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** * <code>required string productName = 3;</code> */ public Builder setProductName( String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; productName_ = value; onChanged(); return this; } /** * <code>required string productName = 3;</code> */ public Builder clearProductName() { bitField0_ = (bitField0_ & ~0x00000004); productName_ = getDefaultInstance().getProductName(); onChanged(); return this; } /** * <code>required string productName = 3;</code> */ public Builder setProductNameBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; productName_ = value; onChanged(); return this; } // repeated string address = 4; private com.google.protobuf.LazyStringList address_ = com.google.protobuf.LazyStringArrayList.EMPTY; private void ensureAddressIsMutable() { if (!((bitField0_ & 0x00000008) == 0x00000008)) { address_ = new com.google.protobuf.LazyStringArrayList(address_); bitField0_ |= 0x00000008; } } /** * <code>repeated string address = 4;</code> */ public java.util.List<String> getAddressList() { return java.util.Collections.unmodifiableList(address_); } /** * <code>repeated string address = 4;</code> */ public int getAddressCount() { return address_.size(); } /** * <code>repeated string address = 4;</code> */ public String getAddress(int index) { return address_.get(index); } /** * <code>repeated string address = 4;</code> */ public com.google.protobuf.ByteString getAddressBytes(int index) { return address_.getByteString(index); } /** * <code>repeated string address = 4;</code> */ public Builder setAddress( int index, String value) { if (value == null) { throw new NullPointerException(); } ensureAddressIsMutable(); address_.set(index, value); onChanged(); return this; } /** * <code>repeated string address = 4;</code> */ public Builder addAddress( String value) { if (value == null) { throw new NullPointerException(); } ensureAddressIsMutable(); address_.add(value); onChanged(); return this; } /** * <code>repeated string address = 4;</code> */ public Builder addAllAddress( Iterable<String> values) { ensureAddressIsMutable(); super.addAll(values, address_); onChanged(); return this; } /** * <code>repeated string address = 4;</code> */ public Builder clearAddress() { address_ = com.google.protobuf.LazyStringArrayList.EMPTY; bitField0_ = (bitField0_ & ~0x00000008); onChanged(); return this; } /** * <code>repeated string address = 4;</code> */ public Builder addAddressBytes( com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } ensureAddressIsMutable(); address_.add(value); onChanged(); return this; } // @@protoc_insertion_point(builder_scope:netty.SubscribeReq) } static { defaultInstance = new RequestBean(true); defaultInstance.initFields(); } // @@protoc_insertion_point(class_scope:netty.SubscribeReq) } private static com.google.protobuf.Descriptors.Descriptor internal_static_netty_SubscribeReq_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_netty_SubscribeReq_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { return descriptor; } private static com.google.protobuf.Descriptors.FileDescriptor descriptor; static { String[] descriptorData = { "\n\030netty/SubscribeReq.proto\022\005netty\"X\n\014Sub" + "scribeReq\022\020\n\010subReqID\030\001 \002(\005\022\020\n\010userName\030" + "\002 \002(\t\022\023\n\013productName\030\003 \002(\t\022\017\n\007address\030\004 " + "\003(\tB2\n\035com.phei.netty.codec.protobufB\021Su" + "bscribeReqProto" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors( com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; internal_static_netty_SubscribeReq_descriptor = getDescriptor().getMessageTypes().get(0); internal_static_netty_SubscribeReq_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_netty_SubscribeReq_descriptor, new String[] { "SubReqID", "UserName", "ProductName", "Address", }); return null; } }; com.google.protobuf.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] { }, assigner); } // @@protoc_insertion_point(outer_class_scope) }
响应的proto文件:
// Generated by the protocol buffer compiler. DO NOT EDIT! // source: netty/ResponseBean.proto package encoder_decoder.protobuf.proto; public final class ResponseProto { private ResponseProto() { } public static void registerAllExtensions(com.google.protobuf.ExtensionRegistry registry) { } public interface ResponseBeanOrBuilder extends com.google.protobuf.MessageOrBuilder { // required int32 subReqID = 1; /** * <code>required int32 subReqID = 1;</code> */ boolean hasSubReqID(); /** * <code>required int32 subReqID = 1;</code> */ int getSubReqID(); // required int32 respCode = 2; /** * <code>required int32 respCode = 2;</code> */ boolean hasRespCode(); /** * <code>required int32 respCode = 2;</code> */ int getRespCode(); // required string desc = 3; /** * <code>required string desc = 3;</code> */ boolean hasDesc(); /** * <code>required string desc = 3;</code> */ String getDesc(); /** * <code>required string desc = 3;</code> */ com.google.protobuf.ByteString getDescBytes(); } /** * Protobuf type {@code netty.ResponseBean} */ public static final class ResponseBean extends com.google.protobuf.GeneratedMessage implements ResponseBeanOrBuilder { // Use ResponseBean.newBuilder() to construct. private ResponseBean(com.google.protobuf.GeneratedMessage.Builder<?> builder) { super(builder); this.unknownFields = builder.getUnknownFields(); } private ResponseBean(boolean noInit) { this.unknownFields = com.google.protobuf.UnknownFieldSet.getDefaultInstance(); } private static final ResponseBean defaultInstance; public static ResponseBean getDefaultInstance() { return defaultInstance; } public ResponseBean getDefaultInstanceForType() { return defaultInstance; } private final com.google.protobuf.UnknownFieldSet unknownFields; @Override public final com.google.protobuf.UnknownFieldSet getUnknownFields() { return this.unknownFields; } private ResponseBean(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { initFields(); int mutable_bitField0_ = 0; com.google.protobuf.UnknownFieldSet.Builder unknownFields = com.google.protobuf.UnknownFieldSet.newBuilder(); try { boolean done = false; while (!done) { int tag = input.readTag(); switch (tag) { case 0: done = true; break; default: { if (!parseUnknownField(input, unknownFields, extensionRegistry, tag)) { done = true; } break; } case 8: { bitField0_ |= 0x00000001; subReqID_ = input.readInt32(); break; } case 16: { bitField0_ |= 0x00000002; respCode_ = input.readInt32(); break; } case 26: { bitField0_ |= 0x00000004; desc_ = input.readBytes(); break; } } } } catch (com.google.protobuf.InvalidProtocolBufferException e) { throw e.setUnfinishedMessage(this); } catch (java.io.IOException e) { throw new com.google.protobuf.InvalidProtocolBufferException(e.getMessage()).setUnfinishedMessage(this); } finally { this.unknownFields = unknownFields.build(); makeExtensionsImmutable(); } } public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return ResponseProto.internal_static_netty_ResponseBean_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return ResponseProto.internal_static_netty_ResponseBean_fieldAccessorTable.ensureFieldAccessorsInitialized( ResponseProto.ResponseBean.class, ResponseProto.ResponseBean.Builder.class); } public static com.google.protobuf.Parser<ResponseBean> PARSER = new com.google.protobuf.AbstractParser<ResponseBean>() { public ResponseBean parsePartialFrom(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return new ResponseBean(input, extensionRegistry); } }; @Override public com.google.protobuf.Parser<ResponseBean> getParserForType() { return PARSER; } private int bitField0_; // required int32 subReqID = 1; public static final int SUBREQID_FIELD_NUMBER = 1; private int subReqID_; /** * <code>required int32 subReqID = 1;</code> */ public boolean hasSubReqID() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 subReqID = 1;</code> */ public int getSubReqID() { return subReqID_; } // required int32 respCode = 2; public static final int RESPCODE_FIELD_NUMBER = 2; private int respCode_; /** * <code>required int32 respCode = 2;</code> */ public boolean hasRespCode() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required int32 respCode = 2;</code> */ public int getRespCode() { return respCode_; } // required string desc = 3; public static final int DESC_FIELD_NUMBER = 3; private Object desc_; /** * <code>required string desc = 3;</code> */ public boolean hasDesc() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>required string desc = 3;</code> */ public String getDesc() { Object ref = desc_; if (ref instanceof String) { return (String) ref; } else { com.google.protobuf.ByteString bs = (com.google.protobuf.ByteString) ref; String s = bs.toStringUtf8(); if (bs.isValidUtf8()) { desc_ = s; } return s; } } /** * <code>required string desc = 3;</code> */ public com.google.protobuf.ByteString getDescBytes() { Object ref = desc_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((String) ref); desc_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } private void initFields() { subReqID_ = 0; respCode_ = 0; desc_ = ""; } private byte memoizedIsInitialized = -1; public final boolean isInitialized() { byte isInitialized = memoizedIsInitialized; if (isInitialized != -1) return isInitialized == 1; if (!hasSubReqID()) { memoizedIsInitialized = 0; return false; } if (!hasRespCode()) { memoizedIsInitialized = 0; return false; } if (!hasDesc()) { memoizedIsInitialized = 0; return false; } memoizedIsInitialized = 1; return true; } public void writeTo(com.google.protobuf.CodedOutputStream output) throws java.io.IOException { getSerializedSize(); if (((bitField0_ & 0x00000001) == 0x00000001)) { output.writeInt32(1, subReqID_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { output.writeInt32(2, respCode_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { output.writeBytes(3, getDescBytes()); } getUnknownFields().writeTo(output); } private int memoizedSerializedSize = -1; public int getSerializedSize() { int size = memoizedSerializedSize; if (size != -1) return size; size = 0; if (((bitField0_ & 0x00000001) == 0x00000001)) { size += com.google.protobuf.CodedOutputStream.computeInt32Size(1, subReqID_); } if (((bitField0_ & 0x00000002) == 0x00000002)) { size += com.google.protobuf.CodedOutputStream.computeInt32Size(2, respCode_); } if (((bitField0_ & 0x00000004) == 0x00000004)) { size += com.google.protobuf.CodedOutputStream.computeBytesSize(3, getDescBytes()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; return size; } private static final long serialVersionUID = 0L; @Override protected Object writeReplace() throws java.io.ObjectStreamException { return super.writeReplace(); } public static ResponseProto.ResponseBean parseFrom(com.google.protobuf.ByteString data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static ResponseProto.ResponseBean parseFrom(com.google.protobuf.ByteString data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static ResponseProto.ResponseBean parseFrom(byte[] data) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data); } public static ResponseProto.ResponseBean parseFrom(byte[] data, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws com.google.protobuf.InvalidProtocolBufferException { return PARSER.parseFrom(data, extensionRegistry); } public static ResponseProto.ResponseBean parseFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static ResponseProto.ResponseBean parseFrom(java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static ResponseProto.ResponseBean parseDelimitedFrom(java.io.InputStream input) throws java.io.IOException { return PARSER.parseDelimitedFrom(input); } public static ResponseProto.ResponseBean parseDelimitedFrom(java.io.InputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseDelimitedFrom(input, extensionRegistry); } public static ResponseProto.ResponseBean parseFrom(com.google.protobuf.CodedInputStream input) throws java.io.IOException { return PARSER.parseFrom(input); } public static ResponseProto.ResponseBean parseFrom(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { return PARSER.parseFrom(input, extensionRegistry); } public static Builder newBuilder() { return Builder.create(); } public Builder newBuilderForType() { return newBuilder(); } public static Builder newBuilder(ResponseProto.ResponseBean prototype) { return newBuilder().mergeFrom(prototype); } public Builder toBuilder() { return newBuilder(this); } @Override protected Builder newBuilderForType(BuilderParent parent) { Builder builder = new Builder(parent); return builder; } /** * Protobuf type {@code netty.ResponseBean} */ public static final class Builder extends com.google.protobuf.GeneratedMessage.Builder<Builder> implements ResponseProto.ResponseBeanOrBuilder { public static final com.google.protobuf.Descriptors.Descriptor getDescriptor() { return ResponseProto.internal_static_netty_ResponseBean_descriptor; } protected FieldAccessorTable internalGetFieldAccessorTable() { return ResponseProto.internal_static_netty_ResponseBean_fieldAccessorTable.ensureFieldAccessorsInitialized( ResponseProto.ResponseBean.class, ResponseProto.ResponseBean.Builder.class); } // Construct using ResponseProto.ResponseBean.newBuilder() private Builder() { maybeForceBuilderInitialization(); } private Builder(BuilderParent parent) { super(parent); maybeForceBuilderInitialization(); } private void maybeForceBuilderInitialization() { if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { } } private static Builder create() { return new Builder(); } public Builder clear() { super.clear(); subReqID_ = 0; bitField0_ = (bitField0_ & ~0x00000001); respCode_ = 0; bitField0_ = (bitField0_ & ~0x00000002); desc_ = ""; bitField0_ = (bitField0_ & ~0x00000004); return this; } public Builder clone() { return create().mergeFrom(buildPartial()); } public com.google.protobuf.Descriptors.Descriptor getDescriptorForType() { return ResponseProto.internal_static_netty_ResponseBean_descriptor; } public ResponseProto.ResponseBean getDefaultInstanceForType() { return ResponseProto.ResponseBean.getDefaultInstance(); } public ResponseProto.ResponseBean build() { ResponseProto.ResponseBean result = buildPartial(); if (!result.isInitialized()) { throw newUninitializedMessageException(result); } return result; } public ResponseProto.ResponseBean buildPartial() { ResponseProto.ResponseBean result = new ResponseProto.ResponseBean(this); int from_bitField0_ = bitField0_; int to_bitField0_ = 0; if (((from_bitField0_ & 0x00000001) == 0x00000001)) { to_bitField0_ |= 0x00000001; } result.subReqID_ = subReqID_; if (((from_bitField0_ & 0x00000002) == 0x00000002)) { to_bitField0_ |= 0x00000002; } result.respCode_ = respCode_; if (((from_bitField0_ & 0x00000004) == 0x00000004)) { to_bitField0_ |= 0x00000004; } result.desc_ = desc_; result.bitField0_ = to_bitField0_; onBuilt(); return result; } public Builder mergeFrom(com.google.protobuf.Message other) { if (other instanceof ResponseProto.ResponseBean) { return mergeFrom((ResponseProto.ResponseBean) other); } else { super.mergeFrom(other); return this; } } public Builder mergeFrom(ResponseProto.ResponseBean other) { if (other == ResponseProto.ResponseBean.getDefaultInstance()) return this; if (other.hasSubReqID()) { setSubReqID(other.getSubReqID()); } if (other.hasRespCode()) { setRespCode(other.getRespCode()); } if (other.hasDesc()) { bitField0_ |= 0x00000004; desc_ = other.desc_; onChanged(); } this.mergeUnknownFields(other.getUnknownFields()); return this; } public final boolean isInitialized() { if (!hasSubReqID()) { return false; } if (!hasRespCode()) { return false; } if (!hasDesc()) { return false; } return true; } public Builder mergeFrom(com.google.protobuf.CodedInputStream input, com.google.protobuf.ExtensionRegistryLite extensionRegistry) throws java.io.IOException { ResponseProto.ResponseBean parsedMessage = null; try { parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); } catch (com.google.protobuf.InvalidProtocolBufferException e) { parsedMessage = (ResponseProto.ResponseBean) e.getUnfinishedMessage(); throw e; } finally { if (parsedMessage != null) { mergeFrom(parsedMessage); } } return this; } private int bitField0_; // required int32 subReqID = 1; private int subReqID_; /** * <code>required int32 subReqID = 1;</code> */ public boolean hasSubReqID() { return ((bitField0_ & 0x00000001) == 0x00000001); } /** * <code>required int32 subReqID = 1;</code> */ public int getSubReqID() { return subReqID_; } /** * <code>required int32 subReqID = 1;</code> */ public Builder setSubReqID(int value) { bitField0_ |= 0x00000001; subReqID_ = value; onChanged(); return this; } /** * <code>required int32 subReqID = 1;</code> */ public Builder clearSubReqID() { bitField0_ = (bitField0_ & ~0x00000001); subReqID_ = 0; onChanged(); return this; } // required int32 respCode = 2; private int respCode_; /** * <code>required int32 respCode = 2;</code> */ public boolean hasRespCode() { return ((bitField0_ & 0x00000002) == 0x00000002); } /** * <code>required int32 respCode = 2;</code> */ public int getRespCode() { return respCode_; } /** * <code>required int32 respCode = 2;</code> */ public Builder setRespCode(int value) { bitField0_ |= 0x00000002; respCode_ = value; onChanged(); return this; } /** * <code>required int32 respCode = 2;</code> */ public Builder clearRespCode() { bitField0_ = (bitField0_ & ~0x00000002); respCode_ = 0; onChanged(); return this; } // required string desc = 3; private Object desc_ = ""; /** * <code>required string desc = 3;</code> */ public boolean hasDesc() { return ((bitField0_ & 0x00000004) == 0x00000004); } /** * <code>required string desc = 3;</code> */ public String getDesc() { Object ref = desc_; if (!(ref instanceof String)) { String s = ((com.google.protobuf.ByteString) ref).toStringUtf8(); desc_ = s; return s; } else { return (String) ref; } } /** * <code>required string desc = 3;</code> */ public com.google.protobuf.ByteString getDescBytes() { Object ref = desc_; if (ref instanceof String) { com.google.protobuf.ByteString b = com.google.protobuf.ByteString.copyFromUtf8((String) ref); desc_ = b; return b; } else { return (com.google.protobuf.ByteString) ref; } } /** * <code>required string desc = 3;</code> */ public Builder setDesc(String value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; desc_ = value; onChanged(); return this; } /** * <code>required string desc = 3;</code> */ public Builder clearDesc() { bitField0_ = (bitField0_ & ~0x00000004); desc_ = getDefaultInstance().getDesc(); onChanged(); return this; } /** * <code>required string desc = 3;</code> */ public Builder setDescBytes(com.google.protobuf.ByteString value) { if (value == null) { throw new NullPointerException(); } bitField0_ |= 0x00000004; desc_ = value; onChanged(); return this; } // @@protoc_insertion_point(builder_scope:netty.ResponseBean) } static { defaultInstance = new ResponseBean(true); defaultInstance.initFields(); } // @@protoc_insertion_point(class_scope:netty.ResponseBean) } private static com.google.protobuf.Descriptors.Descriptor internal_static_netty_ResponseBean_descriptor; private static com.google.protobuf.GeneratedMessage.FieldAccessorTable internal_static_netty_ResponseBean_fieldAccessorTable; public static com.google.protobuf.Descriptors.FileDescriptor getDescriptor() { return descriptor; } private static com.google.protobuf.Descriptors.FileDescriptor descriptor; static { String[] descriptorData = { "\n\031netty/ResponseBean.proto\022\005netty\"A\n\rSu" + "bscribeResp\022\020\n\010subReqID\030\001 \002(\005\022\020\n\010respCod" + "e\030\002 \002(\005\022\014\n\004desc\030\003 \002(\tB3\n\035com.phei.netty." + "codec.protobufB\022ResponseProto" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { public com.google.protobuf.ExtensionRegistry assignDescriptors(com.google.protobuf.Descriptors.FileDescriptor root) { descriptor = root; internal_static_netty_ResponseBean_descriptor = getDescriptor().getMessageTypes().get(0); internal_static_netty_ResponseBean_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable(internal_static_netty_ResponseBean_descriptor, new String[] { "SubReqID", "RespCode", "Desc", }); return null; } }; com.google.protobuf.Descriptors.FileDescriptor.internalBuildGeneratedFileFrom(descriptorData, new com.google.protobuf.Descriptors.FileDescriptor[] {}, assigner); } // @@protoc_insertion_point(outer_class_scope) }
接下来看一下客户端的handler:
package encoder_decoder.protobuf.client; import encoder_decoder.protobuf.proto.RequestProto; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.ArrayList; import java.util.List; /** * @author jiahp * @function 处理客户端的io线程 */ public class ProtobufClientHandler extends ChannelInboundHandlerAdapter { public ProtobufClientHandler() { } public void channelActive(ChannelHandlerContext ctx) { for (int i = 0; i < 10; i++) { ctx.writeAndFlush(subReq(i)); } } private RequestProto.RequestBean subReq(int i) {
//基于生成的proto文件获取对象 RequestProto.RequestBean.Builder builder = RequestProto.RequestBean.newBuilder(); builder.setSubReqID(i); builder.setUserName("JHP"); builder.setProductName("Netty Book For Protobuf"); List<String> address = new ArrayList(); address.add("NanJing YuHuaTai"); address.add("BeiJing LiuLiChang"); address.add("ShenZhen HongShuLin"); builder.addAllAddress(address); return builder.build(); } public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("Receive server response : [" + msg + "]"); } public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close(); } }
客户端发送请求:
package encoder_decoder.protobuf.client; import encoder_decoder.protobuf.proto.RequestProto; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; /** * 处理protobuf的client */ public class ProtobufClient { public void connect(int port, String host) throws Exception { // 配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { //添加netty自带的protobuf解码实现 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(RequestProto.RequestBean.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); //添加netty自带的protobuf编码实现 ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new ProtobufClientHandler()); } }); // 发起异步连接操作 ChannelFuture f = b.connect(host, port).sync(); // 当代客户端链路关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放NIO线程组 group.shutdownGracefully(); } } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new ProtobufClient().connect(port, "127.0.0.1"); } }
再看对应的服务端处理handler和服务端处理客户端的请求:
package encoder_decoder.protobuf.server; import encoder_decoder.protobuf.proto.RequestProto; import encoder_decoder.protobuf.proto.ResponseProto; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; /** * 处理client的handler * @ author jiahp */ @Sharable public class ProtobufServerHandler extends ChannelHandlerAdapter { public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RequestProto.RequestBean req = (RequestProto.RequestBean) msg; if ("JHP".equalsIgnoreCase(req.getUserName())) { System.out.println("Service accept client subscribe req : [" + req.toString() + "]"); ctx.writeAndFlush(resp(req.getSubReqID())); } } private ResponseProto.ResponseBean resp(int subReqID) {
//基于protobuf生成的文件获取响应对象 ResponseProto.ResponseBean.Builder builder = ResponseProto.ResponseBean.newBuilder(); builder.setSubReqID(subReqID); builder.setRespCode(0); builder.setDesc("Netty book order succeed, 3 days later, sent to the designated address"); return builder.build(); } public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { cause.printStackTrace(); ctx.close();// 发生异常,关闭链路 } }
服务端处理:
package encoder_decoder.protobuf.server; import encoder_decoder.masharlling.server.RequestServerHandler; import encoder_decoder.protobuf.proto.RequestProto; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.protobuf.ProtobufDecoder; import io.netty.handler.codec.protobuf.ProtobufEncoder; import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder; import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; /** * @author jiahp * @function 处理protobuf的server */ public class ProtobufServer { public void bind(int port) throws Exception { // 配置服务端的NIO线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 100).handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { //添加netty自带的解码实现 ch.pipeline().addLast(new ProtobufVarint32FrameDecoder()); ch.pipeline().addLast(new ProtobufDecoder(RequestProto.RequestBean.getDefaultInstance())); ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender()); //添加netty自带的编码实现 ch.pipeline().addLast(new ProtobufEncoder()); ch.pipeline().addLast(new RequestServerHandler()); } }); // 绑定端口,同步等待成功 ChannelFuture f = b.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出,释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws Exception { int port = 8080; if (args != null && args.length > 0) { try { port = Integer.valueOf(args[0]); } catch (NumberFormatException e) { // 采用默认值 } } new ProtobufServer().bind(port); } }
可以看一下测试效果:
以及响应结果:
到目前为止都是使用netty里面提供的相关编解码器,但是实际业务中有时候需要开发者自定义编解码完成特定的功能,最后就介绍一下如何实现自定义编解码功能(没有使用编解码器的情况消息都是bytebuf),
实现自定义编解码功能完成Double类型转换:代码结构
首先实现自定义编码器:double数值转换字节码
package encoder_decoder.custom.encoder; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; /** * @ author jiahp * @ function 自定义编码器 */ public class DoubleToByteEncoder extends MessageToByteEncoder<Double> { @Override protected void encode(ChannelHandlerContext ctx, Double value, ByteBuf out) throws Exception { System.out.println("custom encoder msg is: " + value); out.writeDouble(value); } }
对应的解码器:字节码转换为double类型数值
package encoder_decoder.custom.decoder; import java.util.List; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; /** * @ author jiahp * @ function 自定义解码器 */ public class ByteToDoubleDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { //需要做检查 可读字节一定要大于4 if (in.readableBytes() >= 4) { double value = in.readDouble(); System.out.println("custom decoder msg is: " + value); out.add(value); } } }
看看客户端相关代码:
package encoder_decoder.custom.client; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; /** * @Author 18011618 * @Description 客户端处理的handlerss * @Date 14:32 2018/6/25 * @Modify By */ public class CustomClientHandler extends ChannelInboundHandlerAdapter { /** * 客户端连接上服务端之后会调用此方法 * @param ctx * @throws Exception */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i <2 ; i++) { ctx.writeAndFlush(i*0.01); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { Double body = (Double) msg; System.out.println("Client :" + body.toString()); } finally { ReferenceCountUtil.release(msg); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
和客户端:
package encoder_decoder.custom.client; import encoder_decoder.custom.decoder.ByteToDoubleDecoder; import encoder_decoder.custom.encoder.DoubleToByteEncoder; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * @Author jiahp * @Description 客户端 * @Date 14:32 2018/6/25 * @Modify By */ public class CustomClient { /** * 连接服务器 * * @param port * @param host * @throws Exception */ public void connect(int port, String host) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ClientChannelHandler());// ChannelFuture f = b.connect(host, port).sync(); f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } /** * 网络事件处理器 */ private class ClientChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 增加自定义的编码器和解码器 ch.pipeline().addLast(new DoubleToByteEncoder()); ch.pipeline().addLast(new ByteToDoubleDecoder()); // 客户端的处理器 ch.pipeline().addLast(new CustomClientHandler()); } } public static void main(String[] args) throws Exception { new CustomClient().connect(8080, "127.0.0.1"); } }
再看对应的服务端handler:
package encoder_decoder.custom.server; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; /** * @Author 18011618 * @Description * @Date 14:32 2018/6/25 * @Modify By */ public class CustomServerHandler extends ChannelInboundHandlerAdapter{ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //默认的Msg是bytebuf类型 需要进行转换操作 //现在自定义编解码器就可以直接进行强转 Double body = (Double) msg; System.out.println("Client :" + body.toString()); ctx.writeAndFlush(11.44); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); } }
对应的服务端:
package encoder_decoder.custom.server; import encoder_decoder.custom.decoder.ByteToDoubleDecoder; import encoder_decoder.custom.encoder.DoubleToByteEncoder; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * @Author jiahp * @Description * @Date 14:32 2018/6/25 * @Modify By */ public class CustomServer { public void bind(int port) throws Exception { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class)// .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_BACKLOG, 1024) .option(ChannelOption.SO_SNDBUF, 32 * 1024) .option(ChannelOption.SO_RCVBUF, 32 * 1024) .option(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChildChannelHandler()); ChannelFuture f = serverBootstrap.bind(port).sync(); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出 释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } /** * channel的初始化 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // 增加自定义的编码器和解码器 ch.pipeline().addLast(new DoubleToByteEncoder()); ch.pipeline().addLast(new ByteToDoubleDecoder()); // 服务端的处理器 ch.pipeline().addLast(new CustomServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8080; new CustomServer().bind(port); } }
最后看一下运行效果:
和
上面就实现了一个简单的自定义编解码器,功能虽然很简单,但是实际业务中只不过逻辑可能比较复杂,但步骤和思路基本是一致的,在后面还会介绍如何使用自定义编码+自定义协议完成更复杂的实例.
最后加上项目pom.xml的配置文件:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.suning.netty</groupId> <artifactId>netty_code</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.6.Final</version> </dependency> <!--注意这里要手动加上jboss-marshalling-serial.jar,否则会出现运行的时候服务端是接收不到数据的--> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.4.0.Final</version> </dependency> <dependency> <groupId>org.jboss.marshalling</groupId> <artifactId>jboss-marshalling-serial</artifactId> <version>1.4.0.Final</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java --> <dependency> <groupId>com.google.protobuf</groupId> <artifactId>protobuf-java</artifactId> <version>2.5.0</version> </dependency> </dependencies> </project>