HDFS读流程之后, 继续阅读源码的写入流程, 大部分情况, 写入要比读取要复杂许多的(因为有错误恢复和追加写), 所以需要细心记录梳理
同样, 写入部分的源码也分Client, Namenode和Datanode三个部分来看, 先看看整体和Client的部分.
PS: 最后会做个串联全部写的Slide, 经过”总-分-总”的流程之后, 才能真正的把HDFS的写流程弄清楚
0x00. 整体流程
阅读写源码前, 也需要先清楚读开始提到的几个基本概念, 会经常用到, 这里不再复述, 之后讲技术内容, 尽量都会先提供一个自己参考/重画的概览图(总结), 先有个总体的了解, 再说具体(代码)细节.
同样先给一个我重画之后的写整体流程图,如下所示: (附原高清图地址, 有问题欢迎随时联系更新)

其中客户端核心在create()
和write()
两个API调用中, 大体步骤如下:
- 新写一个文件, 先调用
create()
方法底层去给NN发一个RPC请求, NN收到请求后会在FS的目录树对应路径添加一个空的新文件, 然后记在editLog, 创建完成后返回FSDataOutputStream
, 它调用DFSOutputStream
写数据
- 有了输出流对象之后, 核心就是调用
write()
方法, 在①中只是单纯创建空文件, 这里再开始申请新的数据块, 以及传输管道, 成功后会返回这个块的所有DN节点信息.
- 通过传输管道把要写入的数据切分为一个个packet(约64K)大小发送, 然后在多个DN节点之间依次顺序传输, 然后逆序返回ACK确认写成功, 最后客户端接收到正常ACK, 表示当前packet发送成功
- 如果DN接收完文件所有packet, 则会向NN汇报当前block信息
- 如果写满了一个block, client会接着再申请一个新的block, 直到所有数据传输完成
- 最后关闭流的时候, 送
complete()-rpc
给NN提交文件的所有block, 确认DN已都完成块汇报后, NN确认当前文件写完成.
0x01. 代码例子
接着来看常见的写数据的Client-API使用代码:
1 2 3 4 5 6 7 8 9 10 11 12
| FileSystem fs = FileSystem.get(new URI("hdfs://ip:port"), new Configuration());
FSDataOutputStream outputStream = fs.create(new Path("/test/writeFile"));
outputStream.write(new byte[4609]);
outputStream.writeBytes("Test write data");
outputStream.close();
|
可以看到, 类似读取流程, 从Client-API
层面来看是很简单的, 细节和对应发送的RPC都隐藏在封装下面. 接着看看
0x02. 写入准备
A. 整体流程
沿着刚才的大流程, 我们分几个部分一个个来说, 先来看看第一步的fs.create(path)
的整体流程. 核心就是发RPC获得新文件初始化两个对象, 比较简单:
B. 代码细节
备注: 为方便阅读, 所有方法抛出的IO异常都被默认略去, 所有日志打印/Trace都被省略, 低关联调用简化, 参数值均为HDFS3默认官方参数
FileSystem
是我们程序的入口, 它有许多create()
方法的重载, 只传入一个Path
对象, 实际已经带了一系列的默认参数, 实际调用如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
|
public abstract FSDataOutputStream create(Path f, FsPermission permission, // 文件权限 boolean overwrite, // 默认true (覆盖写) int bufferSize, short replication, long blockSize, Progressable progress);
@Override public FSDataOutputStream create(final Path f, params...) { Path absF = fixRelativePart(f); return new FileSystemLinkResolver<FSDataOutputStream>() { @Override public FSDataOutputStream doCall(final Path p) { final DFSOutputStream dfsos = dfs.create(params...); return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next(final FileSystem fs, final Path p) { return fs.create(params...); }}.resolve(this, absF); }
public DFSOutputStream create(params..., // 之前介绍过的参数 InetSocketAddress[] favoredNodes, // NN首选节点列表,默认空 String ecPolicyName) { final FsPermission masked = applyUMask(permission); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this, allParams...); beginFileLease(result.getFileId(), result); return result; }
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, params...) { try { HdfsFileStatus stat = null; boolean shouldRetry = true; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false; try { stat = dfsClient.namenode.create(params..); break; } catch (RemoteException re) { IOException e = re.unwrapRemoteException(exceptions...); if (e instanceof RetryStartFileException) { if (retryCount > 0) { shouldRetry = true; retryCount--; } else { throwExcp("Too many retries because of encryption zone operations", e); }} else throw e; }}} final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) { out = new DFSStripedOutputStream(params...); } else { out = new DFSOutputStream(params...); } out.getStreamer().start(); return out; } }
protected void computePacketChunkSize(int psize, int csize) { final int bodySize = psize - (PKT_LENGTHS_LEN + MAX_PROTO_SIZE); final int chunkSize = csize + getChecksumSize(); chunksPerPacket = Math.max(bodySize / chunkSize, 1); packetSize = chunkSize * chunksPerPacket; }
|
这是HDFS3.1.2
的Packet完整结构的图 (基于低版图修改, 低版个数和数值略有差别) :

附一下Packet的几个核心成员的默认值和含义:
名称 |
数值 (默认) |
含义 |
psize |
65536B (64K) |
packetSizeMax的意思. 理论上可达65536B (64K) |
csize |
512B |
每个chunk数据域的大小 |
checkSumSize |
4B |
每个chunk校验域的大小 |
chunkSize |
516B |
一个完整的chunk的大小 ( 数据域+ 校验域) |
PKT_LENGTHS_LEN |
6B |
PacketLength和HeaderLength之和 –> 4 + 2 |
MAX_PROTO_SIZE |
27B |
packet头中走protobuf序列化的最大占用(详见后文) |
bodySize |
65503B |
一个packet的数据域部分最大值 (也称packet data) |
chunksPerPacket |
126 |
每个packet可以承载多少个校验块(低版127个) |
packetSize |
65016 |
这是当前实际传输时每个packet的大小, 约63.5K |
经过上面的层层追溯, 从顶层FileSystem.create
到最后发送RPC请求, 以及初始化DataStreamer
这个后续传输数据的关键对象的脉络就很清楚了, 包括也能看出新版本HDFS在写文件的时候, 会根据是否开启EC, 进行不同的初始化操作.
上面是写入的准备工作, 获得了一个空的文件, 初始化了两个关键对象, 接下来看看写入的核心write()
方法.
0x03. 写入数据
写入过程比较复杂, 简单可以抽象成 “发送packet –> 响应packet” , 然后拆分为几个大步骤, 拆分为几个模块去讲, 整体结构如图: (基于书中原图修改, 修正了一些错误)

1. 创建Packet
Packet是Client和DN间数据传输的最小单位, 它可以按是否携带数据域分为两类:
- 普通数据包
- 特别功能包 (包括心跳包和空尾包)
先来看看普通数据包, 的数据结构和序列化部分有些变化, 所以基于之前的图重新画了一个新版本, 不然是对不上的

这里补充一下Packet头里经过PB序列化的字段含义: (每个字段的真实长度在PB中是类型长度+1)
名称 |
类型 |
长度 |
含义 |
offsetInBlock |
long |
8 |
packet 在 block 中的偏移量 |
seqNo |
long |
8 |
packet的编号,在同一个block中唯一 |
lastPacketInBlock |
bool |
1 |
是否为一个block 的最后一个packet |
dataLen |
int |
4 |
记录chunk data的实际总长度 (dataPosition-dataStart) |
syncBlock |
bool |
1 |
强制刷盘(写穿), 调用POSIX的O_SYNC标志 |
数据从IO流到Packet的过程需要清楚, 假设我们要写1025
个字节的一个文件, 它至少需要占据3个chunk, 那么过程如下:
- 首先packet头变长占据最多33字节, 那么实际写数据(checksum+data)是从33B的偏移开始, 比如写了一个4B的
checkSum1
- 然后紧接着写
chunk data1
, 它从(33 + 126 * 4) = 537
的偏移开始写, 写满512B
- 同样的方式写入第二个chunk, 现在还剩下1字节没有写入, 它占据第三个chunk, 把packet头中的
lastPacketInBlock
设置为true, 再写了1B的偏移量之后, 剩下511B的空余会补0么? (待确定)
- (补充) 虽然数据写完了, 但此时显然packet没有写满, 那从checkSum3到chunkData1之间还有一段未被使用过的缓冲区, 那么在发送Packet之前做的空缺检查(gap)会把chunkData整体前移,让checkSum3和chunkData1最终相邻(详见后文)
再来看看心跳包, 这里高低版本结构有所不同:
低版本1/2里是使用Packet
类的无参构造方法表示心跳包, HDFS3中改为Packet
类专供DN的BlockReceiver使用, 而客户端写数据专用DFSPacket
类, 它没有packet body, 只有packet header, 并通过seqNo
(-1)来标识自己为一个心跳包, 比较简单. (文中如无特殊说明, packet都是指的DFSPacket
)
上面看完了前置所需的知识, 了解了packet的构造, 下面结合代码来看看实际的调用过程:
之前我们通过create()
方法创建了一个新的空文件, 以及DFSOutputStream
这个输出流对象, 紧接着就是用它输出流去调用write()
方法了(继承自FSOutputSummer
), 它将一定大小的数据(比如1025B)写入输出流一个缓冲区中, 然后按上面的packet格式切分为多个数据包
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| @Override public synchronized void write(byte b[], int off, int len) { for (int n=0;n<len;n+=write1(b, off+n, len-n)) {} }
private int write1(byte b[], int off, int len) { if(count==0 && len>=buf.length) { final int length = buf.length; writeChecksumChunks(b, off, length); return length; } int bytesToCopy = buf.length-count; bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy; System.arraycopy(b, off, buf, count, bytesToCopy); count += bytesToCopy; if (count == buf.length) { flushBuffer(); } return bytesToCopy; }
@Override protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum, int ckoff, int cklen) { writeChunkPrepare(len, ckoff, cklen);
currentPacket.writeChecksum(checksum, ckoff, cklen); currentPacket.writeData(b, offset, len); currentPacket.incNumChunks(); getStreamer().incBytesCurBlock(len);
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() || getStreamer().getBytesCurBlock() == blockSize) { enqueueCurrentPacket(); adjustChunkBoundary(); endBlock(); } }
|
到这里, 正常写流程的第一大步就差不多完成了, 接下来看更核心的发送过程, 发送过程重点讲DataStreamer
的run()
方法
2. 发送Packet
A. 整体结构
因为小于63.5K的数据不足发车要求, 这里假设写入128K的文件, 它应该占据三个packet, 但是只有前两个packet会入队发车, 最后一个在末班车才发.
满足1个packet大小的数据会被封装好, 然后Client依次发给DN, 这里Packet只是数据包的载体, 就像HTTP请求发数据, 除了数据载体本身也需要有一个请求头, 所以在发送Packet之前, 需要一个Sender发送写请求, 得到了所有DN的确认后, 才会开始发数据
这里把每个Packet看做是一节载货车厢, 最后一节是空车厢(空尾包), 它们整体结构就像一辆载货的火车, 画了个简图方便大家理解:

注: “火车头“Sender并非是跟着货物(Packets)一起发送, 而是先单独发送, 车厢(packet)再进入铁轨(pipeline), 开始一个个传输(火车头的结构在DN篇说)
B. 发包线程DataStreamer (核心)
核心的代码在DataStreamer
这个后台线程类的run()
方法中, 线程启动是在create()
调用最后, 也就是说还没开始write()
的时候, 它就已经在后台运行了, 核心做两件事:
这里我们主要先看第一个功能, 打开传输通道, 让火车能在铁轨上跑起来, 为了方便阅读, 简化大量无关代码 (包括错误机制):
- 线程启动后就一直循环检测是否有需要发送的packet, 没有则等待一段时间
- 当有packet需要发送了, 取出来 (或者等待时间超时, 构造心跳packet)
- 给NN发请求要block的信息, 并初始化pipeline, 使之变为就绪态 (可以传数据了)
- 正式发送则把packet对象从dataQueue出队, 然后进入ackQueue (每个block的最后一个packet会单独延时等待)
- 再把packet实际写入pipeline中, 并刷新 (进入DN的操作)
- 更新发送信息, 然后跳至1发送下一个packet (如果是最后的packet, 则准备收尾操作)
- stream完全关闭或客户端中断, 则进入最后的大收尾操作
如图所示:
C. 源码分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
|
@Override public void run() { long lastPacket = Time.monotonicNow(); while (!streamerClosed && dfsClient.clientRunning) { DFSPacket one; try { synchronized (dataQueue) { long now = Time.monotonicNow(); while ((dataQueue.isEmpty() && (stage != DATA_STREAMING || now - lastPacket < 30))) { long timeout = 30 - (now-lastPacket); timeout = timeout <= 0 ? 1000 : timeout; timeout = (stage == DATA_STREAMING)? timeout : 1000; dataQueue.wait(timeout); now = Time.monotonicNow(); } if (dataQueue.isEmpty()) { one = createHeartbeatPacket(); } else { backOffIfNecessary(); one = dataQueue.getFirst(); } }
if (stage == PIPELINE_SETUP_CREATE) { setPipeline(nextBlockOutputStream()); initDataStreaming(); } else if (stage == PIPELINE_SETUP_APPEND) { setupPipelineForAppendOrRecovery(); initDataStreaming(); } long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (one.isLastPacketInBlock()) { synchronized (dataQueue) { while (ackQueue.size() != 0) dataQueue.wait(1000); } if (shouldStop()) continue; stage = PIPELINE_CLOSE; }
synchronized (dataQueue) { if (!one.isHeartbeatPacket()) { dataQueue.removeFirst(); ackQueue.addLast(one); dataQueue.notifyAll(); } }
try{ one.writeTo(blockStream); blockStream.flush(); } catch (IOException e) {
errorState.markFirstNodeIfNotMarked(); throw e; } lastPacket = Time.monotonicNow();
long tmpBytesSent = one.getLastByteOffsetBlock(); if (bytesSent < tmpBytesSent) bytesSent = tmpBytesSent;
if (one.isLastPacketInBlock()) { synchronized (dataQueue) { while (ackQueue.size() != 0) dataQueue.wait(1000); } endBlock(); } } catch (Throwable e) {...}} closeInternal(); }
|
上面7个步骤中中关键点在于第三步的初始化pipeline , 和第五步的写数据, 而初始化pipeline就像给载物火车旅程进行初始化, 我们得知道火车运输的货物信息(BlockInfo), 要开往的目的地信息(DataNodeList), 以及准备好运输的铁轨 (pipeline), 下面看看初始化的详细过程:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116
| protected LocatedBlock nextBlockOutputStream() { LocatedBlock lb; DatanodeInfo[] nodes; StorageType[] nextStorageTypes; String[] nextStorageIDs; int count = 3; boolean success; final ExtendedBlock oldBlock = block.getCurrentBlock(); do { DatanodeInfo[] excluded = getExcludedNodes(); lb = locateFollowingBlock(excluded.length > 0 ? excluded : null, oldBlock); block.setCurrentBlock(lb.getBlock()); block.setNumBytes(0); bytesSent = 0; accessToken = lb.getBlockToken(); nodes = lb.getLocations(); nextStorageTypes = lb.getStorageTypes(); nextStorageIDs = lb.getStorageIDs();
success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs, 0L,false);
if (!success) { dfsClient.namenode.abandonBlock(...); block.setCurrentBlock(null); final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()]; excludedNodes.put(badNode, badNode); } } while (!success && --count >= 0);
if (!success) throw new IOException("Unable to create new block."); return lb; }
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes, DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId, String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags) { long sleeptime = 400; while (true) { try { return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock, excludedNodes, fileId, favoredNodes, allocFlags); } catch (RemoteException e) { Thread.sleep(sleeptime); sleeptime *= 2; }} else {throw e;}}}}
boolean createBlockOutputStream(DatanodeInfo[] nodes, StorageType[] nodeStorageTypes, String[] nodeStorageIDs,long newGS, boolean recoveryFlag){ String firstBadLink = ""; boolean checkRestart = false; while (true) { boolean result = false; DataOutputStream out = null; try { s = createSocketForPipeline(nodes[0], nodes.length, dfsClient); long writeTimeout = 480 + 5 * nodes.length; long readTimeout = 60 + 5 * nodes.length;
... out = new DataOutputStream(new BufferedOutputStream(unbufOut, param))); blockReplyStream = new DataInputStream(unbufIn);
new Sender(out).writeBlock(params....);
BlockOpResponseProto resp = parseFrom(...blockReplyStream); Status pipelineStatus = resp.getStatus(); firstBadLink = resp.getFirstBadLink();
if (PipelineAck.isRestartOOBStatus(pipelineStatus)) { checkRestart = true; throw new IOException("A datanode is restarting."); }
blockStream = out; result = true; } catch (IOException ie) {...} if (firstBadLink.length() != 0) { for (int i = 0; i < nodes.length; i++) { if (firstBadLink.equals(nodes[i].getXferAddr())) { errorState.setBadNodeIndex(i); break; } } } else { errorState.setBadNodeIndex(0); } result = false; } finally { closeSocketAndStreamWhenFail();} return result; }}
|
至此, 客户端核心的写入数据的过程差不多说了, 因为客户端是用户使用最多的组件, 所以它的代码细节需要仔细阅读, 有笔误可及时联系更正.
篇幅原因, Client剩下的接收响应ACK的response线程, 以及写入收尾关闭流的过程就拆分到第二篇写笔记中.
参考资料:
- 官方博客/Wiki (由于很多人引用图没有注明出处, 暂不清楚原始出处)
- HDFS3.1.2官方源码
- Hadoop 2.X HDFS源码剖析