Zookeeper源码解析 -- 序列化和解序列化器
序言:
1. 序列化器在各个常见库中都是不可或缺的一部分,不管是通过json,probuff,还是zk 中用到的jute设计, 通过阅读序列化器和反解析器,我们能从中得到怎样的工程写法和工程设计的思路,这些是值得总结的。
2. apache jute 包下的解析器和反解析器,用到的把解析分散化到个体的设计思路值得学习,具体表现有
- 不同的bean 的序列化和反序列化应该是要放其本身的类定义里面,通过接口往外暴露序列,反序列化方法,代码从集中式管理到分散式管理,提取出接口统一对外调用
正文:
1. 作为一个序列器和反序列化器,如果定义有dsl,通过专有的dsl生成一些固定代码,节省编写一些固定代码的时间,这种模板代码的自动生成思路在probuf 等序列化工具上面有所实现,在 jute 包下面,同样也是简单
实现了这么一个简单的功能,通过查看包名
可以看到jute包自带有几种语言的代码生成器,通过读取 zookeeper.jute的dsl文件,可以生成包括java,c ,C# 等结构性代码。在这里不详讲dsl语法和生成代码的具体步骤。
2. 看到序列化器中,核心无非是两种,一个是输入,一个是输出,来看看,jute包是如何实现的。
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import java.util.List; import java.util.TreeMap; /** * Interface that all the serializers have to implement. * */ public interface OutputArchive { void writeByte(byte b, String tag) throws IOException; void writeBool(boolean b, String tag) throws IOException; void writeInt(int i, String tag) throws IOException; void writeLong(long l, String tag) throws IOException; void writeFloat(float f, String tag) throws IOException; void writeDouble(double d, String tag) throws IOException; void writeString(String s, String tag) throws IOException; void writeBuffer(byte[] buf, String tag) throws IOException; void writeRecord(Record r, String tag) throws IOException; void startRecord(Record r, String tag) throws IOException; void endRecord(Record r, String tag) throws IOException; void startVector(List<?> v, String tag) throws IOException; void endVector(List<?> v, String tag) throws IOException; void startMap(TreeMap<?, ?> v, String tag) throws IOException; void endMap(TreeMap<?, ?> v, String tag) throws IOException; } |
OutputArchive,是序列化器的接口,是数据往外输出的接口,从接口来看,我们可以直接对外序列化一些基本类型,如bool,int ,long ,float 等等,而其中一些复合类型是通过这个Record接口来对外输出的
我们来看看这个Record接口的定义
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.IOException; import org.apache.yetus.audience.InterfaceAudience; /** * Interface that is implemented by generated classes. */ @InterfaceAudience.Public public interface Record { void serialize(OutputArchive archive, String tag) throws IOException; void deserialize(InputArchive archive, String tag) throws IOException; } |
可以看到Record接口中,定义了一个序列化接口serialize, 一个反序列化接口deserialize,两者需要分别传入OutputArchive,InputArchive这两个参数,这个设计是比较巧妙的,可以想象到的是,后面OutputArchive会把自身引用传进去,然后通过两者的互联调用来实现一些序列化功能,从而把集中式的代码逻辑分散,这个相互调用的设计是比较值得学习的地方。
看完接口,然后继而查看实现类,OutputArchive中一个比较常用的实现类是BinaryOutputArchive,通过byte数组来进行序列化的承载,通过OutputSteam直接进行对象到二进制数据的直接输出。
/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.jute; import java.io.DataOutput; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.List; import java.util.TreeMap; /** * */ public class BinaryOutputArchive implements OutputArchive { private ByteBuffer bb = ByteBuffer.allocate(1024); private DataOutput out; public static BinaryOutputArchive getArchive(OutputStream strm) { return new BinaryOutputArchive(new DataOutputStream(strm)); } /** * Creates a new instance of BinaryOutputArchive. */ public BinaryOutputArchive(DataOutput out) { this.out = out; } public void writeByte(byte b, String tag) throws IOException { out.writeByte(b); } public void writeBool(boolean b, String tag) throws IOException { out.writeBoolean(b); } public void writeInt(int i, String tag) throws IOException { out.writeInt(i); } public void writeLong(long l, String tag) throws IOException { out.writeLong(l); } public void writeFloat(float f, String tag) throws IOException { out.writeFloat(f); } public void writeDouble(double d, String tag) throws IOException { out.writeDouble(d); } /** * create our own char encoder to utf8. This is faster * then string.getbytes(UTF8). * * @param s the string to encode into utf8 * @return utf8 byte sequence. */ private ByteBuffer stringToByteBuffer(CharSequence s) { bb.clear(); final int len = s.length(); for (int i = 0; i < len; i++) { if (bb.remaining() < 3) { ByteBuffer n = ByteBuffer.allocate(bb.capacity() << 1); bb.flip(); n.put(bb); bb = n; } char c = s.charAt(i); if (c < 0x80) { bb.put((byte) c); } else if (c < 0x800) { bb.put((byte) (0xc0 | (c >> 6))); bb.put((byte) (0x80 | (c & 0x3f))); } else { bb.put((byte) (0xe0 | (c >> 12))); bb.put((byte) (0x80 | ((c >> 6) & 0x3f))); bb.put((byte) (0x80 | (c & 0x3f))); } } bb.flip(); return bb; } public void writeString(String s, String tag) throws IOException { if (s == null) { writeInt(-1, "len"); return; } ByteBuffer bb = stringToByteBuffer(s); writeInt(bb.remaining(), "len"); out.write(bb.array(), bb.position(), bb.limit()); } public void writeBuffer(byte[] barr, String tag) throws IOException { if (barr == null) { out.writeInt(-1); return; } out.writeInt(barr.length); out.write(barr); } public void writeRecord(Record r, String tag) throws IOException { r.serialize(this, tag); } public void startRecord(Record r, String tag) throws IOException { } public void endRecord(Record r, String tag) throws IOException { } public void startVector(List<?> v, String tag) throws IOException { if (v == null) { writeInt(-1, tag); return; } writeInt(v.size(), tag); } public void endVector(List<?> v, String tag) throws IOException { } public void startMap(TreeMap<?, ?> v, String tag) throws IOException { writeInt(v.size(), tag); } public void endMap(TreeMap<?, ?> v, String tag) throws IOException { } } |
可以看到BinaryOutputArchive代码还是相当简短的,实际上复杂对象的序列化逻辑都分散到各个bean实体上面去了,通过各自继承Record接口来对外提供序列化方法。
首先看到其构造器,
public static BinaryOutputArchive getArchive(OutputStream strm) { return new BinaryOutputArchive(new DataOutputStream(strm)); } |
把OutputStream用DataOutputStream做包装,来看看这个DataOutputStream的解释
package java.io; /** * A data output stream lets an application write primitive Java data * types to an output stream in a portable way. An application can * then use a data input stream to read the data back in. * * @author unascribed * @see java.io.DataInputStream * @since 1.0 */ public class DataOutputStream extends FilterOutputStream implements DataOutput { /** * The number of bytes written to the data output stream so far. * If this counter overflows, it will be wrapped to Integer.MAX_VALUE. */ protected int written; // 略 } |
可以看到这个DataOutputStream是java自带 对象序列化器,它有局限,就是它只能序列化一些基本对象, 通过
DataInputStream 就能反解析回来DataOutputStream对外输出的数据。从这里可以看到,jute是依托于java本身带有的序列化器来实现复杂bean的序列化的,性能也是不低的,比起json这类 需要通过构建dst树的
解析器而言,性能应该要高不少的。
BinaryOutputArchive 的一些基本类型的数据输出就不提了,是依托于DataOutputStream而言的,来看看复杂bean是如何对外输出的,其中核心方法是
public void writeRecord(Record r, String tag) throws IOException { r.serialize(this, tag); } public void startRecord(Record r, String tag) throws IOException { } public void endRecord(Record r, String tag) throws IOException { } |
可以看到 这里的 writeRecord ,是委托给这个Record接口来实现的,Record接口中又把自身引用传入(自身引用是OutputArchive这个接口的定义),然后我们看看是谁在调用 这个writeRecord方法,又是怎么调用的。
这里随便找个例子
// 略 protected OutputArchive leaderOs; /** the protocol version of the leader */ bufferedOutput = new BufferedOutputStream(sock.getOutputStream()); leaderOs = BinaryOutputArchive.getArchive(bufferedOutput); // 略 void writePacketNow(QuorumPacket pp, boolean flush) throws IOException { synchronized (leaderOs) { if (pp != null) { messageTracker.trackSent(pp.getType()); leaderOs.writeRecord(pp, "packet"); } if (flush) { bufferedOutput.flush(); } } } |
这里就很明显了,通过初始化这个BinaryOutputArchive后,直接调用其writeRecord方法,对外序列化对象到socker处,然后这个QuorumPacket是通过 zookeeper.jute dsl文件代码生成的模板对象,我们可以简单看下
其生成出来的定义:
// File generated by hadoop record compiler. Do not edit. /** * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.zookeeper.server.quorum; import org.apache.jute.*; import org.apache.jute.Record; // JDK14 needs explicit import due to * with java.lang.Record import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Public public class QuorumPacket implements Record { private int type; private long zxid; private byte[] data; private java.util.List<org.apache.zookeeper.data.Id> authinfo; public QuorumPacket() { } public QuorumPacket( int type, long zxid, byte[] data, java.util.List<org.apache.zookeeper.data.Id> authinfo) { this.type=type; this.zxid=zxid; this.data=data; this.authinfo=authinfo; } public int getType() { return type; } public void setType(int m_) { type=m_; } public long getZxid() { return zxid; } public void setZxid(long m_) { zxid=m_; } public byte[] getData() { return data; } public void setData(byte[] m_) { data=m_; } public java.util.List<org.apache.zookeeper.data.Id> getAuthinfo() { return authinfo; } public void setAuthinfo(java.util.List<org.apache.zookeeper.data.Id> m_) { authinfo=m_; } public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeInt(type,"type"); a_.writeLong(zxid,"zxid"); a_.writeBuffer(data,"data"); { a_.startVector(authinfo,"authinfo"); if (authinfo!= null) { int len1 = authinfo.size(); for(int vidx1 = 0; vidx1<len1; vidx1++) { org.apache.zookeeper.data.Id e1 = (org.apache.zookeeper.data.Id) authinfo.get(vidx1); a_.writeRecord(e1,"e1"); } } a_.endVector(authinfo,"authinfo"); } a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); type=a_.readInt("type"); zxid=a_.readLong("zxid"); data=a_.readBuffer("data"); { Index vidx1 = a_.startVector("authinfo"); if (vidx1!= null) { authinfo=new java.util.ArrayList<org.apache.zookeeper.data.Id>(); for (; !vidx1.done(); vidx1.incr()) { org.apache.zookeeper.data.Id e1; e1= new org.apache.zookeeper.data.Id(); a_.readRecord(e1,"e1"); authinfo.add(e1); } } a_.endVector("authinfo"); } a_.endRecord(tag); } public String toString() { try { java.io.ByteArrayOutputStream s = new java.io.ByteArrayOutputStream(); ToStringOutputArchive a_ = new ToStringOutputArchive(s); a_.startRecord(this,""); a_.writeInt(type,"type"); a_.writeLong(zxid,"zxid"); a_.writeBuffer(data,"data"); { a_.startVector(authinfo,"authinfo"); if (authinfo!= null) { int len1 = authinfo.size(); for(int vidx1 = 0; vidx1<len1; vidx1++) { org.apache.zookeeper.data.Id e1 = (org.apache.zookeeper.data.Id) authinfo.get(vidx1); a_.writeRecord(e1,"e1"); } } a_.endVector(authinfo,"authinfo"); } a_.endRecord(this,""); return new String(s.toByteArray(), "UTF-8"); } catch (Throwable ex) { ex.printStackTrace(); } return "ERROR"; } public void write(java.io.DataOutput out) throws java.io.IOException { BinaryOutputArchive archive = new BinaryOutputArchive(out); serialize(archive, ""); } public void readFields(java.io.DataInput in) throws java.io.IOException { BinaryInputArchive archive = new BinaryInputArchive(in); deserialize(archive, ""); } public int compareTo (Object peer_) throws ClassCastException { throw new UnsupportedOperationException("comparing QuorumPacket is unimplemented"); } public boolean equals(Object peer_) { if (!(peer_ instanceof QuorumPacket)) { return false; } if (peer_ == this) { return true; } QuorumPacket peer = (QuorumPacket) peer_; boolean ret = false; ret = (type==peer.type); if (!ret) return ret; ret = (zxid==peer.zxid); if (!ret) return ret; ret = java.util.Arrays.equals(data,peer.data); if (!ret) return ret; ret = authinfo.equals(peer.authinfo); if (!ret) return ret; return ret; } public int hashCode() { int result = 17; int ret; ret = (int)type; result = 37*result + ret; ret = (int) (zxid^(zxid>>>32)); result = 37*result + ret; ret = java.util.Arrays.hashCode(data); result = 37*result + ret; ret = authinfo.hashCode(); result = 37*result + ret; return result; } public static String signature() { return "LQuorumPacket(ilB[LId(ss)])"; } } |
通过前面的分析,我们知道了,OutputArchive其实是委托到具体实体类实现的Record接口,调用其Serialize接口,可以看到这个具体bean的Serialize接口的具体实现,
public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeInt(type,"type"); a_.writeLong(zxid,"zxid"); a_.writeBuffer(data,"data"); { a_.startVector(authinfo,"authinfo"); if (authinfo!= null) { int len1 = authinfo.size(); for(int vidx1 = 0; vidx1<len1; vidx1++) { org.apache.zookeeper.data.Id e1 = (org.apache.zookeeper.data.Id) authinfo.get(vidx1); a_.writeRecord(e1,"e1"); } } a_.endVector(authinfo,"authinfo"); } a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); type=a_.readInt("type"); zxid=a_.readLong("zxid"); data=a_.readBuffer("data"); { Index vidx1 = a_.startVector("authinfo"); if (vidx1!= null) { authinfo=new java.util.ArrayList<org.apache.zookeeper.data.Id>(); for (; !vidx1.done(); vidx1.incr()) { org.apache.zookeeper.data.Id e1; e1= new org.apache.zookeeper.data.Id(); a_.readRecord(e1,"e1"); authinfo.add(e1); } } a_.endVector("authinfo"); } a_.endRecord(tag); } |
可以看到它的序列化步骤是,直接调用外部传进来的OutputArchive接口,也就是前面提到的BinaryOutputArchive实现,可以输出一些基本类型的数据,包括长度,byte[]等等,这里比较特别的是,由于这里有
数组存储的内容,需要输出一些数组的信息,调用了startVector 这个方法,这个在BinaryOutputArchive的实现也是比较简单的
public void startVector(List<?> v, String tag) throws IOException { if (v == null) { writeInt(-1, tag); return; } writeInt(v.size(), tag); } |
就是输出一些数组的长度信息,方便后面反序列化回来,然后看到 这个authinfo list里面的内容是org.apache.zookeeper.data.Id,它同样是一个复合对象,可以看到它的结构如下,
@InterfaceAudience.Public public class Id implements Record { private String scheme; private String id; public Id() { } public Id( String scheme, String id) { this.scheme=scheme; this.id=id; } public String getScheme() { return scheme; } public void setScheme(String m_) { scheme=m_; } public String getId() { return id; } public void setId(String m_) { id=m_; } public void serialize(OutputArchive a_, String tag) throws java.io.IOException { a_.startRecord(this,tag); a_.writeString(scheme,"scheme"); a_.writeString(id,"id"); a_.endRecord(this,tag); } public void deserialize(InputArchive a_, String tag) throws java.io.IOException { a_.startRecord(tag); scheme=a_.readString("scheme"); id=a_.readString("id"); a_.endRecord(tag); } // 不重要略 } |
可以看到Id这个类同样是通过定义自动生成的模板bean代码,同样是复合对象继承了Record接口,然后我们需要序列化这个复合对象时候,我们同样调用了OutputArchive.
void writeRecord(Record r, String tag) throws IOException; 这个方式来委托给Id来进行其本身的序列化工作,通过这种相互调用相互委托的方式实现了序列化的流程。可以看到反序列化的流程其实也在其本身,
deserialize 这个方法,也是比较清晰的。
类似的,解序列化的流程的流程也是比较类似的,在这里不详讲了。其中list的反序列流程工作中依托了Index这个接口,通过
/** * Interface that acts as an iterator for deserializing maps. * The deserializer returns an instance that the record uses to * read vectors and maps. An example of usage is as follows: * * <code> * Index idx = startVector(...); * while (!idx.done()) { * .... // read element of a vector * idx.incr(); * } * </code> * */ public interface Index { boolean done(); void incr(); } |
通过这个index来反序列list和map这两个可迭代的容器。
总结:
zookeeper 的序列化和反序列器本质上依托的是java 字节流映射到基本类型的工具DataOutputStream,然后 apache jute包通过巧妙的设计,将复杂对象也能单独序列化,从而完成了整个序列化器的工作。其中的引用相互调用的设计方式和理念是值得学习的。
设计思想,任务代码分散
完成具体工作的类分散开,委托给集中管理的管理者,管理者有一些简单方法可共用,但是复杂工作还是具体委托到分散的子类中,然后通过接口集中管理集中控制。