Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(1)
一、综述
HDFS写文件是整个Hadoop中最为复杂的流程之一,它涉及到HDFS中NameNode、DataNode、DFSClient等众多角色的分工与合作。
首先上一段代码,客户端是如何写文件的:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Path file = new Path("demo.txt");
FSDataOutputStream outStream = fs.create(file);
out.write("Welcome to HDFS Java API !!!".getBytes("UTF-8"));
outStream.close();
只有简单的6行代码,客户端封装的如此简洁,各组件间的RPC调用、异常处理、容错等均对客户端透明。
总体来说,最简单的HDFS写文件大体流程如下:
1、客户端获取文件系统实例FileSyStem,并通过其create()方法获取文件系统输出流outputStream;
1.1、首先会联系名字节点NameNode,通过ClientProtocol.create()RPC调用,在名字节点上创建文件元数据,并获取文件状态FileStatus;
1.2、通过文件状态FileStatus构造文件系统输出流outputStream;
2、通过文件系统输出流outputStream写入数据;
2.1、首次写入会首先向名字节点申请数据块,名字节点能够掌握集群DataNode整体状况,分配数据块后,连同DataNode列表信息返回给客户端;
2.2、客户端采用流式管道的方式写入数据节点列表中的第一个DataNode,并由列表中的前一个DataNode将数据转发给后面一个DataNode;
2.3、确认数据包由DataNode经过管道依次返回给上游DataNode和客户端;
2.4、写满一个数据块后,向名字节点提交一个数据;
2.5、再次重复2.1-2.4过程;
3、向名字节点提交文件(complete file),即告知名字节点文件已写完,然后关闭文件系统输出流outputStream等释放资源。
可以看出,在不考虑异常等的情况下,上述过程还是比较复杂的。本文,我将着重阐述下HDFS写数据时,客户端是如何实现的,关于NameNode、DataNode等的配合等,后续文章将陆续推出,敬请关注!
二、实现分析
我们将带着以下问题来分析客户端写入数据过程:
1、如何获取数据输出流?
2、如何通过数据输出流写入数据?
3、数据输出流关闭时都做了什么?
4、如果发生异常怎么办?即如何容错?
(一)如何获取数据输出流?
HDFS客户端获取数据流是一个复杂的过程,流程图如下:
以DistributedFileSystem为例,create()是其入口方法,DistributedFileSystem内部封装了一个DFS的客户端,如下:
DFSClient dfs;
在DistributedFileSystem的初始化方法initialize()中,会构造这个文件系统客户端,如下:
this.dfs = new DFSClient(uri, conf, statistics);
而create()方法就是通过这个文件系统客户端dfs获取数据输出流的,如下:
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize, final Progressable progress,
final ChecksumOpt checksumOpt) throws IOException {
statistics.incrementWriteOps(1);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
/*
* 创建文件系统数据输出流
*/
@Override
public FSDataOutputStream doCall(final Path p)
throws IOException, UnresolvedLinkException {
// 调用create()方法创建文件,并获取文件系统输出流
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
return dfs.createWrappedOutputStream(dfsos, statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}
FileSystemLinkResolver是一个文件系统链接解析器(抽象类),我们待会再分析它,这里只要知道,该抽象类实例化后会通过resolve()方法--doCall()方法得到数据输出流即可。接着往下DFSClient的create()方法,省略部分代码,如下:
// 为create构建一个数据输出流
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
buffersize, dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes));
// 开启文件租约
beginFileLease(result.getFileId(), result);
return result;
实际上,它又通过DFSOutputStream的newStreamForCreate()方法来获取数据输出流,并开启文件租约。租约的内容我们后续再讲,继续看下如何获取文件输出流的,如下:
/**
* 为创建文件构造一个新的输出流
*/
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress, int buffersize,
DataChecksum checksum, String[] favoredNodes) throws IOException {
TraceScope scope =
dfsClient.getPathTraceScope("newStreamForCreate", src);
try {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
// 首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<CreateFlag>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
// 构造一个数据输出流
final DFSOutputStream out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
// 启动数据输出流
out.start();
return out;
} finally {
scope.close();
}
}
大体可以分为三步:
1、首先,通过DFSClient中nameNode的Create()方法,在HDFS文件系统名字节点中创建一个文件,并返回文件状态HdfsFileStatus;
2、构造一个数据输出流;
3、启动数据输出流。
上述连接NameNode节点创建文件的过程中,如果发生瞬时错误,会充分利用重试机制,增加系统容错性。DFSClient中nameNode的Create()方法,实际上是调用的是客户端与名字节点间的RPC--ClientProtocol的create()方法,该方法的作用即是在NameNode上创建一个空文件,并返回文件状态。文件状态主要包括以下信息:
// 文件路径
private final byte[] path; // local name of the inode that's encoded in java UTF8
// 符号连接
private final byte[] symlink; // symlink target encoded in java UTF8 or null
private final long length;// 文件长度
private final boolean isdir;// 是否为目录
private final short block_replication;// 数据块副本数
private final long blocksize;// 数据块大小
private final long modification_time;// 修改时间
private final long access_time;// 访问时间
private final FsPermission permission;// 权限
private final String owner;// 文件所有者
private final String group;// 文件所属组
private final long fileId;// 文件ID
继续看如何构造一个数据输出流,实际上它是通过构造DFSOutputStream实例获取的,而DFSOutputStream的构造方法如下:
/** Construct a new output stream for creating a file. */
private DFSOutputStream(DFSClient dfsClient, String src, HdfsFileStatus stat,
EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes) throws IOException {
this(dfsClient, src, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
// 计算数据包块大小
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
// 构造数据流对象
streamer = new DataStreamer(stat, null);
if (favoredNodes != null && favoredNodes.length != 0) {
streamer.setFavoredNodes(favoredNodes);
}
}
首先计算数据包块大小,然后构造数据流对象,后续就依靠这个数据流对象来通过管道发送流式数据。接下来便是启动数据输出流,如下:
private synchronized void start() {
streamer.start();
}
很简单,实际上也就是启动数据流对象,通过这个数据流对象实现数据的发送。
中间为什么会有计算数据包块大小这一步呢?原来,数据的发送是通过一个个数据包发送出去的,而不是通过数据块发送的。设想下,如果按照一个数据块(默认128M)大小发送数据,合理吗?至于数据包大小是如何确定的,我们后续再讲。
(二)如何通过数据输出流写入数据?
下面,该看看如何通过数据输出流写入数据了。要解决这个问题,首先分析下DFSOutputStream和DataStreamer是什么。
1、DFSOutputStream
DFSOutputStream是分布式文件系统输出流,它内部封装了两个队列:发送数据包队列和确认数据包队列,如下:
// 发送数据包队列
private final LinkedList<DFSPacket> dataQueue = new LinkedList<DFSPacket>();
// 确认数据包队列
private final LinkedList<DFSPacket> ackQueue = new LinkedList<DFSPacket>();
客户端写入的数据,会addLast入发送数据包队列dataQueue,然后交给DataStreamer处理。2、DataStreamer
DataStreamer是一个后台工作线程,它负责在数据流管道中往DataNode发送数据包。它从NameNode申请获取一个新的数据块ID和数据块位置,然后开始往DataNode的管道写入流式数据包。每个数据包都有一个***sequence number。当一个数据块所有的数据包被发送出去,并且每个数据包的确认信息acks被接收到的话,DataStreamer关闭当前数据块,然后再向NameNode申请下一个数据块。
所以,才会有上述发送数据包和确认数据包这两个队列。
DataStreamer内部有很多变量,大体如下:
// streamer关闭标志位
private volatile boolean streamerClosed = false;
// 扩展块,它的长度是已经确认ack的bytes大小
private ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
// 数据输出流
private DataOutputStream blockStream;
// 数据输入流:即回复流
private DataInputStream blockReplyStream;
// 响应处理器
private ResponseProcessor response = null;
// 当前块的数据块列表
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
// 存储类型
private volatile StorageType[] storageTypes = null;
// 存储ID
private volatile String[] storageIDs = null;
// 需要排除的节点
private final LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
CacheBuilder.newBuilder()
.expireAfterWrite(
dfsClient.getConf().excludedNodesCacheExpiry,
TimeUnit.MILLISECONDS)
.removalListener(new RemovalListener<DatanodeInfo, DatanodeInfo>() {
@Override
public void onRemoval(
RemovalNotification<DatanodeInfo, DatanodeInfo> notification) {
DFSClient.LOG.info("Removing node " +
notification.getKey() + " from the excluded nodes list");
}
})
.build(new CacheLoader<DatanodeInfo, DatanodeInfo>() {
@Override
public DatanodeInfo load(DatanodeInfo key) throws Exception {
return key;
}
});
// 优先节点
private String[] favoredNodes;
// 是否存在错误
volatile boolean hasError = false;
volatile int errorIndex = -1;
// Restarting node index
// 从哪个节点重试的索引
AtomicInteger restartingNodeIndex = new AtomicInteger(-1);
private long restartDeadline = 0; // Deadline of DN restart
// 当前数据块构造阶段
private BlockConstructionStage stage; // block construction stage
// 已发送数据大小
private long bytesSent = 0; // number of bytes that've been sent
private final boolean isLazyPersistFile;
/** Nodes have been used in the pipeline before and have failed. */
private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
/** The last ack sequence number before pipeline failure. */
// 管道pipeline失败前的最后一个确认包***
private long lastAckedSeqnoBeforeFailure = -1;
// 管道恢复次数
private int pipelineRecoveryCount = 0;
/** Has the current block been hflushed? */
// 当前数据块是否已被Hflushed
private boolean isHflushed = false;
/** Append on an existing block? */
// 是否需要在现有块上append
private final boolean isAppend;
有很多比较简单,不再赘述。这里只讲解几个比较重要的:
1、BlockConstructionStage stage
当前数据块构造阶段。针对create()这种写入 来说,开始时默认是BlockConstructionStage.PIPELINE_SETUP_CREATE,即管道初始化时需要向NameNode申请数据块及所在数据节点的状态,这个很容易理解。有了数据块和其所在数据节点所在列表,才能形成管道列表不是?在数据流传输过程中,即一个数据块写入的过程中,虽然有多次数据包写入,但状态始终为DATA_STREAMING,即正在流式写入的阶段。而当发生异常时,则是PIPELINE_SETUP_STREAMING_RECOVERY状态,即需要从流式数据中进行恢复,如果一个数据块写满,则会进入下一个周期,PIPELINE_SETUP_CREATE->DATA_STREAMING,最后数据全部写完后,状态会变成PIPELINE_CLOSE,并且如果发生异常的话,会有一个特殊状态对应,即PIPELINE_CLOSE_RECOVERY。而append开始时则是对应的状态PIPELINE_SETUP_APPEND及异常状态PIPELINE_SETUP_APPEND_RECOVERY,其它则一致。
2、volatile boolean hasError = false
这个状态位用来标记数据写入过程中,是否存在错误,方便进行容错。
3、ResponseProcessor response
响应处理器。这个也是后台工作线程,它会处理来自DataNode回复流中的确认包,确认数据是否发送成功,如果成功,将确认包从确认数据包队列中移除,否则进行容错处理。
create()模式下的DataStreamer构造比较简单,如下:
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
this.block = block;
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
isAppend设置为false,即不是append写入,BlockConstructionStage默认为PIPELINE_SETUP_CREATE,即需要向NameNode写入数据块。
我们首先看下DataStreamer是如何发送数据的。上面讲到过,DFSOutputStream中包括两个队列:发送数据包队列和确认数据包队列。这类似于两个生产者消--费者模型。针对发送数据包队列,外部写入者为生产者,DataStreamer为消费者。外部持续写入数据至发送数据包队列,DataStreamer则从中消费数据,判断是否需要申请数据块,然后写入数据节点流式管道。而确认数据包队列,DataStreamer为生产者,ResponseProcessor为消费者。首先,确认数据包队列数据的产生,是DataStreamer发送数据给DataNode后,从发送数据包队列挪过来的,而当ResponseProcessor线程确认接收到数据节点的ack确认包后,再从数据确认队列中删除。
关于ResponseProcessor线程,稍后再讲。
数据写入过程之DataStreamer
首先看DataStreamer的run()方法,它会在数据流没有关闭,且dfs客户端正在运行的情况下,一直循环,循环内处理的大体流程如下:
1、如果遇到一个错误(hasErro),且响应器尚未关闭,关闭响应器,使之join等待;
2、如果有DataNode相关IO错误,先预先处理,初始化一些管道和流的信息,并决定外部是否等待,等待意即可以进行容错处理,不等待则数目错误比较严重,无法进行容错处理:这里还判断了errorIndex标志位和restartingNodeIndex的大小,意思是是否是由某个具体数据节点引起的错误,如果是的话,这种错误理论上是可以处理的;
3、没有数据时,等待一个数据包发送:等待的条件是:当前流没有关闭(!streamerClosed)、没有错误(hasError)、dfs客户端正在 运行(dfsClient.clientRunning )、dataQueue队列大小为0,且当前阶段不是DATA_STREAMING,或者在需要sleep(doSleep)或者上次发包距离本次时间未超过阈值的情况下为DATA_STREAMING
意思是各种标记为正常,数据流处于正常发送的过程或者可控的非正常发送过程中,可控表现在状态位doSleep,即上传错误检查中认为理论上可以进行修复,但是需要sleep已完成recovery的初始化,或者距离上次发送未超过时间的阈值等。
4、如果数据流关闭、存在错误、客户端正常运行标志位异常时,执行continue:这个应该是对容错等的处理,让程序及时响应错误;
5、获取将要发送的数据包:
如果数据发送队列为空,构造一个心跳包;否则,取出队列中第一个元素,即待发送数据包。
6、如果当前阶段是PIPELINE_SETUP_CREATE,申请数据块,设置pipeline,初始化数据流:append的setup阶段则是通过setupPipelineForAppendOrRecovery()方法完成的,并同样会初始化数据流;
7、获取数据块中的上次数据位置lastByteOffsetInBlock,如果超过数据块大小,报错;
8、 如果是数据块的最后一个包:等待所有的数据包被确认,即等待datanodes的确认包acks,如果数据流关闭,或者数据节点IO存在错误,或者客户端不再正常运行,continue,设置阶段为pipeline关闭
9、发送数据包:将数据包从dataQueue队列挪至ackQueue队列,通知dataQueue的所有等待者,将数据写入远端的DataNode节点,并flush,如果发生异常,尝试标记主要的数据节点错误,方便容错处理;
10、更新已发送数据大小:可以看出,数据包中存储了其在数据块中的位置LastByteOffsetBlock,也就标记了已经发送数据的总大小;
11、数据块写满了吗?如果是最后一个数据块,等待确认包,调用endBlock()方法结束一个数据块 ;
如果上述流程发生错误,hasError标志位设置为true,并且如果不是一个DataNode引起的原因,流关闭标志设置为true。
最后,没有数据需要发送,或者发生致命错误的情况下,调用closeInternal()方法关闭内部资源。
未完待续,请关注《Hhadoop-2.7.0中HDFS写文件源码分析(二):客户端实现(2)》。
三、代码分析