Hhadoop-2.7.0中HDFS写文件源码分析
一、综述
HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。
首先上一段代码,客户端是如何写文件的:
- Configuration conf = new Configuration();
- FileSystem fs = FileSystem.get(conf);
- Path file = new Path("demo.txt");
- FSDataOutputStream outStream = fs.create(file);
- out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8"));
- outStream.close();
总体来说,最简单的HDFS写文件大体流程如下:
1、客户端获取文件系统实例FileSyStem,并通过其create()方法获取文件系统输出流outputStream;
1.1、首先会联系名字节点NameNode,通过ClientProtocol.create()RPC调用,在名字节点上创建文件元数据,并获取文件状态FileStatus;
1.2、通过文件状态FileStatus构造文件系统输出流outputStream;
2、通过文件系统输出流outputStream写入数据;
2.1、首次写入会首先向名字节点申请数据块,名字节点能够掌握集群DataNode整体状况,分配数据块后,连同DataNode列表信息返回给客户端;
2.2、客户端采用流式管道的方式写入数据节点列表中的第一个DataNode,并由列表中的前一个DataNode将数据转发给后面一个DataNode;
2.3、确认数据包由DataNode经过管道依次返回给上游DataNode和客户端;
2.4、写满一个数据块后,向名字节点提交一个数据;
2.5、再次重复2.1-2.4过程;
3、向名字节点提交文件(complete file),即告知名字节点文件已写完,然后关闭文件系统输出流outputStream等释放资源。
可以看出,在不考虑异常等的情况下,上述过程还是比较复杂的。本文,我将着重阐述下HDFS写数据时,客户端是如何实现的,关于NameNode、DataNode等的配合等,后续文章将陆续推出,敬请关注!
二、实现分析
我们将带着以下问题来分析客户端写入数据过程:
1、如何获取数据输出流?
2、如何通过数据输出流写入数据?
3、数据输出流关闭时都做了什么?
4、如果发生异常怎么办?即如何容错?
(一)如何获取数据输出流?
HDFS客户端获取数据流是一个复杂的过程,流程图如下:
以DistributedFileSystem为例,create()是其入口方法,DistributedFileSystem内部封装了一个DFS的客户端,如下:
- DFSClient dfs;
- this.dfs = new DFSClient(uri, conf, statistics);
而create()方法就是通过这个文件系统客户端dfs获取数据输出流的,如下:
- @Override
- public FSDataOutputStream create(final Path f, final FsPermission permission,
- final EnumSet<CreateFlag> cflags, final int bufferSize,
- final short replication, final long blockSize, final Progressable progress,
- final ChecksumOpt checksumOpt) throws IOException {
- statistics.incrementWriteOps(1);
- Path absF = fixRelativePart(f);
- return new FileSystemLinkResolver<FSDataOutputStream>() {
- /*
- * 创建文件系统数据输出流
- */
- @Override
- public FSDataOutputStream doCall(final Path p)
- throws IOException, UnresolvedLinkException {
- // 调用create()方法创建文件,并获取文件系统输出流
- final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
- cflags, replication, blockSize, progress, bufferSize,
- checksumOpt);
- return dfs.createWrappedOutputStream(dfsos, statistics);
- }
- @Override
- public FSDataOutputStream next(final FileSystem fs, final Path p)
- throws IOException {
- return fs.create(p, permission, cflags, bufferSize,
- replication, blockSize, progress, checksumOpt);
- }
- }.resolve(this, absF);
- }
- // 为create构建一个数据输出流
- final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
- src, masked, flag, createParent, replication, blockSize, progress,
- buffersize, dfsClientConf.createChecksum(checksumOpt),
- getFavoredNodesStr(favoredNodes));
- // 开启文件租约
- beginFileLease(result.getFileId(), result);
- return result;
- /**
- * 为创建文件构造一个新的输出流
- */
- static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
- FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
- short replication, long blockSize, Progressable progress, int buffersize,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
- TraceScope scope =
- dfsClient.getPathTraceScope("newStreamForCreate", src);
- try {
- HdfsFileStatus stat = null;
- // Retry the create if we get a RetryStartFileException up to a maximum
- // number of times
- boolean shouldRetry = true;
- int retryCount = CREATE_RETRY_COUNT;
- while (shouldRetry) {
- shouldRetry = false;
- try {
- // 首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态
- stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
- new EnumSetWritable<CreateFlag>(flag), createParent, replication,
- blockSize, SUPPORTED_CRYPTO_VERSIONS);
- break;
- } catch (RemoteException re) {
- IOException e = re.unwrapRemoteException(
- AccessControlException.class,
- DSQuotaExceededException.class,
- FileAlreadyExistsException.class,
- FileNotFoundException.class,
- ParentNotDirectoryException.class,
- NSQuotaExceededException.class,
- RetryStartFileException.class,
- SafeModeException.class,
- UnresolvedPathException.class,
- SnapshotAccessControlException.class,
- UnknownCryptoProtocolVersionException.class);
- if (e instanceof RetryStartFileException) {
- if (retryCount > 0) {
- shouldRetry = true;
- retryCount--;
- } else {
- throw new IOException("Too many retries because of encryption" +
- " zone operations", e);
- }
- } else {
- throw e;
- }
- }
- }
- Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
- // 构造一个数据输出流
- final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
- flag, progress, checksum, favoredNodes);
- // 启动数据输出流
- out.start();
- return out;
- } finally {
- scope.close();
- }
- }
1、首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态HdfsFileStatus;
2、构造一个数据输出流;
3、启动数据输出流。
上述连接NameNode节点创建文件的过程中,如果发生瞬时错误,会充分利用重试机制,增加系统容错性。DFSClient中nameNode的Create()方法,实际上是调用的是客户端与名字节点间的RPC--ClientProtocol的create()方法,该方法的作用即是在NameNode上创建一个空文件,并返回文件状态。文件状态主要包括以下信息:
- // 文件路径
- private final byte[] path; // local name of the inode that's encoded in java UTF8
- // 符号连接
- private final byte[] symlink; // symlink target encoded in java UTF8 or null
- private final long length;// 文件长度
- private final boolean isdir;// 是否为目录
- private final short block_replication;// 数据块副本数
- private final long blocksize;// 数据块大小
- private final long modification_time;// 修改时间
- private final long access_time;// 访问时间
- private final FsPermission permission;// 权限
- private final String owner;// 文件所有者
- private final String group;// 文件所属组
- private final long fileId;// 文件ID
- /** Construct a new output stream for creating a file. */
- private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
- EnumSet<CreateFlag> flag, Progressable progress,
- DataChecksum checksum, String[] favoredNodes) throws IOException {
- this(dfsClient, src, progress, stat, checksum);
- this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
- // 计算数据包块大小
- computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
- // 构造数据流对象
- streamer = new DataStreamer(stat, null);
- if (favoredNodes != null && favoredNodes.length != 0) {
- streamer.setFavoredNodes(favoredNodes);
- }
- }
- private synchronized void start() {
- streamer.start();
- }
中间为什么会有计算数据包块大小这一步呢?原来,数据的发送是通过一个个数据包发送出去的,而不是通过数据块发送的。设想下,如果按照一个数据块(默认128M)大小发送数据,合理吗?至于数据包大小是如何确定的,我们后续再讲。
(二)如何通过数据输出流写入数据?
下面,该看看如何通过数据输出流写入数据了。要解决这个问题,首先分析下DFSOutputStream和DataStreamer是什么。
1、DFSOutputStream
DFSOutputStream是分布式文件系统输出流,它内部封装了两个队列:发送数据包队列和确认数据包队列,如下:
- // 发送数据包队列
- private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
- // 确认数据包队列
- private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
2、DataStreamer
DataStreamer是一个后台工作线程,它负责在数据流管道中往DataNode发送数据包。它从NameNode申请获取一个新的数据块ID和数据块位置,然后开始往DataNode的管道写入流式数据包。每个数据包都有一个***sequence number。当一个数据块所有的数据包被发送出去,并且每个数据包的确认信息acks被接收到的话,DataStreamer关闭当前数据块,然后再向NameNode申请下一个数据块。
所以,才会有上述发送数据包和确认数据包这两个队列。
DataStreamer内部有很多变量,大体如下:
- // streamer关闭标志位
- private volatile boolean streamerClosed = false;
- // 扩展块,它的长度是已经确认ack的bytes大小
- private ExtendedBlock block; // its length is number of bytes acked
- private Token<BlockTokenIdentifier> accessToken;
- // 数据输出流
- private DataOutputStream blockStream;
- // 数据输入流:即回复流
- private DataInputStream blockReplyStream;
- // 响应处理器
- private ResponseProcessor response = null;
- // 当前块的数据块列表
- private volatile DatanodeInfo[] nodes = null; // list of targets for current block
- // 存储类型
- private volatile StorageType[] storageTypes = null;
- // 存储ID
- private volatile String[] storageIDs = null;
- // 需要排除的节点
- private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
- CacheBuilder.newBuilder()
- .expireAfterWrite(
- dfsClient.getConf().excludedNodesCacheExpiry,
- TimeUnit.MILLISECONDS)
- .removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
- @Override
- public void onRemoval(
- RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
- DFSClient.LOG.info("Removing node " +
- notification.getKey() + " from the excluded nodes list");
- }
- })
- .build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
- @Override
- public DatanodeInfo load(DatanodeInfo key) throws Exception {
- return key;
- }
- });
- // 优先节点
- private String[] favoredNodes;
- // 是否存在错误
- volatile boolean hasError = false;
- volatile int errorIndex = -1;
- // Restarting node index
- // 从哪个节点重试的索引
- AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
- private long restartDeadline = 0; // Deadline of DN restart
- // 当前数据块构造阶段
- private BlockConstructionStage stage; // block construction stage
- // 已发送数据大小
- private long bytesSent = 0; // number of bytes that've been sent
- private final boolean isLazyPersistFile;
- /** Nodes have been used in the pipeline before and have failed. */
- private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
- /** The last ack sequence number before pipeline failure. */
- // 管道pipeline失败前的最后一个确认包***
- private long lastAckedSeqnoBeforeFailure = -1;
- // 管道恢复次数
- private int pipelineRecoveryCount = 0;
- /** Has the current block been hflushed? */
- // 当前数据块是否已被Hflushed
- private boolean isHflushed = false;
- /** Append on an existing block? */
- // 是否需要在现有块上append
- private final boolean isAppend;
1、BlockConstructionStage stage
当前数据块构造阶段。针对create()这种写入 来说,开始时默认是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化时需要向NameNode申请数据块及所在数据节点的状态,这个很容易理解。有了数据块和其所在数据节点所在列表,才能形成管道列表不是?在数据流传输过程中,即一个数据块写入的过程中,虽然有多次数据包写入,但状态始终为DATA_STREAMING,即正在流式写入的阶段。而当发生异常时,则是PIPELINE_SETUP_STREAMING_RECOVERY状态,即需要从流式数据中进行恢复,如果一个数据块写满,则会进入下一个周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最后数据全部写完后,状态会变成PIPELINE_CLOSE,并且如果发生异常的话,会有一个特殊状态对应,即PIPELINE_CLOSE_RECOVERY。而append开始时则是对应的状态PIPELINE_SETUP_APPEND及异常状态PIPELINE_SETUP_APPEND_RECOVERY,其它则一致。
2、volatile boolean hasError = false
这个状态位用来标记数据写入过程中,是否存在错误,方便进行容错。
3、ResponseProcessor response
响应处理器。这个也是后台工作线程,它会处理来自DataNode回复流中的确认包,确认数据是否发送成功,如果成功,将确认包从确认数据包队列中移除,否则进行容错处理。
create()模式下的DataStreamer构造比较简单,如下:
- private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
- isAppend = false;
- isLazyPersistFile = isLazyPersist(stat);
- this.block = block;
- stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
- }
我们首先看下DataStreamer是如何发送数据的。上面讲到过,DFSOutputStream中包括两个队列:发送数据包队列和确认数据包队列。这类似于两个生产者消--费者模型。针对发送数据包队列,外部写入者为生产者,DataStreamer为消费者。外部持续写入数据至发送数据包队列,DataStreamer则从中消费数据,判断是否需要申请数据块,然后写入数据节点流式管道。而确认数据包队列,DataStreamer为生产者,ResponseProcessor为消费者。首先,确认数据包队列数据的产生,是DataStreamer发送数据给DataNode后,从发送数据包队列挪过来的,而当ResponseProcessor线程确认接收到数据节点的ack确认包后,再从数据确认队列中删除。
关于ResponseProcessor线程,稍后再讲。
数据写入过程之DataStreamer
首先看DataStreamer的run()方法,它会在数据流没有关闭,且dfs客户端正在运行的情况下,一直循环,循环内处理的大体流程如下:
1、如果遇到一个错误(hasErro),且响应器尚未关闭,关闭响应器,使之join等待;
2、如果有DataNode相关IO错误,先预先处理,初始化一些管道和流的信息,并决定外部是否等待,等待意即可以进行容错处理,不等待则数目错误比较严重,无法进行容错处理:这里还判断了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某个具体数据节点引起的错误,如果是的话,这种错误理论上是可以处理的;
3、没有数据时,等待一个数据包发送:等待的条件是:当前流没有关闭(!streamerClosed)、没有错误(hasError)、dfs客户端正在 运行(dfsClient.clientRunning )、dataQueue队列大小为0,且当前阶段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次发包距离本次时间未超过阈值的情况下为DATA_STREAMING
意思是各种标记为正常,数据流处于正常发送的过程或者可控的非正常发送过程中,可控表现在状态位doSleep,即上传错误检查中认为理论上可以进行修复,但是需要sleep已完成recovery的初始化,或者距离上次发送未超过时间的阈值等。
4、如果数据流关闭、存在错误、客户端正常运行标志位异常时,执行continue:这个应该是对容错等的处理,让程序及时响应错误;
5、获取将要发送的数据包:
如果数据发送队列为空,构造一个心跳包;否则,取出队列中第一个元素,即待发送数据包。
6、如果当前阶段是PIPELINE_SETUP_CREATE,申请数据块,设置pipeline,初始化数据流:append的setup阶段则是通过setupPipelineForAppendOrRecovery()方法完成的,并同样会初始化数据流;
7、获取数据块中的上次数据位置lastByteOffsetInBlock,如果超过数据块大小,报错;
8、 如果是数据块的最后一个包:等待所有的数据包被确认,即等待datanodes的确认包acks,如果数据流关闭,或者数据节点IO存在错误,或者客户端不再正常运行,continue,设置阶段为pipeline关闭
9、发送数据包:将数据包从dataQueue队列挪至ackQueue队列,通知dataQueue的所有等待者,将数据写入远端的DataNode节点,并flush,如果发生异常,尝试标记主要的数据节点错误,方便容错处理;
10、更新已发送数据大小:可以看出,数据包中存储了其在数据块中的位置LastByteOffsetBlock,也就标记了已经发送数据的总大小;
11、数据块写满了吗?如果是最后一个数据块,等待确认包,调用endBlock()方法结束一个数据块 ;
如果上述流程发生错误,hasError标志位设置为true,并且如果不是一个DataNode引起的原因,流关闭标志设置为true。
最后,没有数据需要发送,或者发生致命错误的情况下,调用closeInternal()方法关闭内部资源。
客户端实现PFSPacket
一、简介
HDFS在数据传输过程中,针对数据块Block,不是整个block进行传输的,而是将block切分成一个个的数据包进行传输。而DFSPacket就是HDFS数据传输过程中对数据包的抽象。
二、实现
HDFS客户端在往DataNodes节点写数据时,会以数据包packet的形式写入,且每个数据包包含一个包头,n个连续的校验和数据块checksum chunks和n个连续的实际数据块 actual data chunks,每个校验和数据块对应一个实际数据块,被用来做数据校验,防止数据传输过程中因网络原因等发生的数据丢包。
DFSPacket内数据的逻辑组织形式如下:
DFSPacket的物理实现如下:
FSPacket在内部持有一个数据缓冲区buf,类型为byte[]
buf用来按顺序存储三类数据,header、checksum chunks、data chunks,分别对应上面的header区域、cccc…cccc区域和dddd…dddd区域
header、checksum chunks和data chunks都是提前分配好的,灰色代表已经写入数据区域,白色代表可以写入数据区域
Header是数据包的头部,它是在后续数据写完后才添加到数据包的头部。因为Header中包含了数据长度等信息,需要在数据写完后进行计算,故头部信息最后生成。Header内部封装了一个Protobuf对象,持有数据在Block中的位置offsetInBlock、数据包***seqno、是否为Block的最后一个数据包lastPacketInBlock、数据长度dataLen等信息,Header在写入DFSPacket中时,会在序列化Protobuf对象的前面追加一个数据长度大小和protobuf序列化大小,方便DataNode等进行解析。
DFSPacket内部有四个指针,分别为
1、checksumStart:标记数据校验和区域起始位置
2、checksumPos:标记数据校验和区域当前写入位置
3、dataStart:标记真实数据区域起始位置
4、dataPos:标记真实数据区域当前写入位置
数据包是按照一组组数据块写入的,先写校验和数据块,再写真实数据块,然后再写下一组校验和数据块和真实数据块,最后再写上header头部信息,至此整个数据包写完。
每个DFSPacket都对应一个***seqno,还存储了数据在数据块中的位置offsetInBlock、数据包中的数据块(chunks)数量numChunks、数据包中的最大数据块数maxChunks、是否为block中最后一个数据包lastPacketInBlock等信息。
三、源码分析
(一)初始化
DFSPacket的初始化分为以下几步:
1、首先计算缓冲区数据大小
1.1、首先,计算writePacketSize,即写包大小
这个是系统配置参数决定的。该大小默认是认为包含头部信息的,意即客户端自己指定的数据包大小,但是实际大小还需要后续计算得到。writePacketSize取自参数dfs.client-write-packet-size,表示客户端写入数据时数据包大小,默认为64*1024,即64KB
1.2、其次,计算bytesPerChecksum,即每多少数据计算校验和
这个是通过DataChecksum实例checksum的getBytesPerChecksum()方法得到的,如下:
- public int getBytesPerChecksum() {
- return bytesPerChecksum;
- }
- DataChecksum dataChecksum = DataChecksum.newDataChecksum(
- myOpt.getChecksumType(),
- myOpt.getBytesPerChecksum());
1.3、计算数据包body大小
bodySize = writePacketSize- PacketHeader.PKT_MAX_HEADER_LEN
最大头部PacketHeader.PKT_MAX_HEADER_LEN大小是一个合理的预估值,它是通过模拟构造一个protobuf对象,然后序列化成byte[]数组后,再加上一个固定的大小(Ints.BYTES + Shorts.BYTES);
Int所占区域用来存放数据包实际数据(含校验和,即除头部区域外的)大小,Short所占区域用来存放header protobuf对象序列化的大小,头部所占区域剩余的地方就是存放头部信息byte[];
1.4、计算chunkSize大小
chunkSize = bytesPerChecksum + getChecksumSize(),getChecksumSize()是获取校验和的大小,chunkSize意思是包含数据校验和块、真实数据块的大小
1.5、计算每个包能包含的块数
chunkSize=Math.max(bodySize/chunkSize, 1),最小为1;
1.6、计算缓冲区内数据大小:
packetSize = chunkSize*chunksPerPacket
chunkSize表示块大小,chunksPerPacket表示每个数据包由多少数据块
1.7、实际申请的缓冲区大小还要加上头部Header的最大大小
bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize
2、申请缓存区数组
3、构造DFSPacket实例,确定各指针位置、其它指标等
2和3代码如下:
- /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
- /**
- * 创建一个数据包
- */
- private DFSPacket createPacket(int packetSize, int chunksPerPkt, long offsetInBlock,
- long seqno, boolean lastPacketInBlock) throws InterruptedIOException {
- final byte[] buf;
- final int bufferSize = PacketHeader.PKT_MAX_HEADER_LEN + packetSize;
- try {
- buf = byteArrayManager.newByteArray(bufferSize);
- } catch (InterruptedException ie) {
- final InterruptedIOException iioe = new InterruptedIOException(
- "seqno=" + seqno);
- iioe.initCause(ie);
- throw iioe;
- }
- return new DFSPacket(buf, chunksPerPkt, offsetInBlock, seqno,
- getChecksumSize(), lastPacketInBlock);
- }
(二)写数据至缓冲区
写数据的过程:
1、 先写入一个校验和块;
2、 再写入一个真实数据块;
3、 块数增1;
4、 重复1-3,写入后续数据块组;
写数据是在DFSOutputStream中触发的,代码如下:
- // 写入校验和
- currentPacket.writeChecksum(checksum, ckoff, cklen);
- // 写入数据
- currentPacket.writeData(b, offset, len);
- // 增加块数目
- currentPacket.incNumChunks();
- // 迭代累加bytesCurBlock
- bytesCurBlock += len;
DataPacket的实现也比较简单,代码如下(有注释):
- /**
- * Write data to this packet.
- * 往包内写入数据
- *
- * @param inarray input array of data
- * @param off the offset of data to write
- * @param len the length of data to write
- * @throws ClosedChannelException
- */
- synchronized void writeData(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- // 检测缓冲区
- checkBuffer();
- // 检测数据当前位置后如果 写入len个字节,是否会超过缓冲区大小
- if (dataPos + len > buf.length) {
- throw new BufferOverflowException();
- }
- // 数据拷贝:从数据当前位置处起开始存放len个字节
- System.arraycopy(inarray, off, buf, dataPos, len);
- // 数据当前位置累加len,指针向后移动
- dataPos += len;
- }
- /**
- * Write checksums to this packet
- * 往包内写入校验和
- *
- * @param inarray input array of checksums
- * @param off the offset of checksums to write
- * @param len the length of checksums to write
- * @throws ClosedChannelException
- */
- synchronized void writeChecksum(byte[] inarray, int off, int len)
- throws ClosedChannelException {
- // 检测缓冲区
- checkBuffer();
- // 校验数据校验和长度
- if (len == 0) {
- return;
- }
- // 根据当前校验和位置和即将写入的数据大小,判断是否超过数据起始位置处,即是否越界
- if (checksumPos + len > dataStart) {
- throw new BufferOverflowException();
- }
- // 数据拷贝:从校验和当前位置处起开始存放len个字节
- System.arraycopy(inarray, off, buf, checksumPos, len);
- // 数据校验和当前位置累加len
- checksumPos += len;
- }
- /**
- * increase the number of chunks by one
- * 增加数据块(chunk)数目
- */
- synchronized void incNumChunks(){
- numChunks++;
- }
(三)缓冲区数据flush到输出流
发送数据过程:
1、 计算数据包的数据长度;
2、 生成头部header信息:一个protobuf对象;
3、 整理缓冲区,去除校验和块区域和真实数据块区域间的空隙;
4、 添加头部信息到缓冲区:从校验和块区域起始往前计算头部信息的起始位置;
5、 将缓冲区数据写入到输出流。
逻辑比较简单,代码如下:
- /**
- * Write the full packet, including the header, to the given output stream.
- * 将整个数据包写入到指定流,包含头部header
- *
- * @param stm
- * @throws IOException
- */
- synchronized void writeTo(DataOutputStream stm) throws IOException {
- // 检测缓冲区
- checkBuffer();
- // 计算数据长度
- final int dataLen = dataPos - dataStart;
- // 计算校验和长度
- final int checksumLen = checksumPos - checksumStart;
- // 计算整个包的数据长度(数据长度+校验和长度+固定长度4)
- final int pktLen = HdfsConstants.BYTES_IN_INTEGER + dataLen + checksumLen;
- // 构造数据包包头信息(protobuf对象)
- PacketHeader header = new PacketHeader(
- pktLen, offsetInBlock, seqno, lastPacketInBlock, dataLen, syncBlock);
- if (checksumPos != dataStart) {// 如果校验和数据当前位置不等于数据起始处,挪动校验和数据以填补空白
- // 这个可能在最后一个数据包或者一个hflush/hsyn调用时发生
- // Move the checksum to cover the gap. This can happen for the last
- // packet or during an hflush/hsync call.
- System.arraycopy(buf, checksumStart, buf,
- dataStart - checksumLen , checksumLen);
- // 重置checksumPos、checksumStart
- checksumPos = dataStart;
- checksumStart = checksumPos - checksumLen;
- }
- // 计算header的起始位置:数据块校验和起始处减去序列化后的头部大小
- final int headerStart = checksumStart - header.getSerializedSize();
- // 做一些必要的确保
- assert checksumStart + 1 >= header.getSerializedSize();
- assert headerStart >= 0;
- assert headerStart + header.getSerializedSize() == checksumStart;
- // Copy the header data into the buffer immediately preceding the checksum
- // data.
- // 将header数据写入缓冲区。header是用protobuf序列化的
- System.arraycopy(header.getBytes(), 0, buf, headerStart,
- header.getSerializedSize());
- // corrupt the data for testing.
- // 测试用
- if (DFSClientFaultInjector.get().corruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
- // Write the now contiguous full packet to the output stream.
- // 写入当前整个连续的packet至输出流
- // 从header起始处,写入长度为头部大小、校验和长度、数据长度的总和
- stm.write(buf, headerStart, header.getSerializedSize() + checksumLen + dataLen);
- // undo corruption.
- // 测试用
- if (DFSClientFaultInjector.get().uncorruptPacket()) {
- buf[headerStart+header.getSerializedSize() + checksumLen + dataLen-1] ^= 0xff;
- }
- }
如果长时间没有数据传输,在输出流未关闭的情况下,客户端会发送心跳包给数据节点,心跳包是DataPacket的一种特殊实现,它通过数据包***为-1来进行特殊标识,如下:
- public static final long HEART_BEAT_SEQNO = -1L;
- /**
- * Check if this packet is a heart beat packet
- * 判断该包释放为心跳包
- *
- * @return true if the sequence number is HEART_BEAT_SEQNO
- */
- boolean isHeartbeatPacket() {
- / 心跳包的***均为-1
- return seqno == HEART_BEAT_SEQNO;
- }
- /**
- * For heartbeat packets, create buffer directly by new byte[]
- * since heartbeats should not be blocked.
- */
- private DFSPacket createHeartbeatPacket() throws InterruptedIOException {
- final byte[] buf = new byte[PacketHeader.PKT_MAX_HEADER_LEN];
- return new DFSPacket(buf, 0, 0, DFSPacket.HEART_BEAT_SEQNO,
- getChecksumSize(), false);
- }