HDFS写入第二篇接着上篇 , 说完写入在Client端 的剩余部分:
响应ACK的Response
线程
关闭文件
0x00. 响应ACK线程 先回顾下第一篇中Client写操作发Packet的流程图:
ackQueue
是一个典型”生产-消费者”模型, Client生产, DN去消费然后返回结果, 通知Client接收, 确认这次消费成功/失败.
然后ResponseProcessor
线程类, 简单说就是不断接受DN1 返回的packet包, 然后判断是否有异常 情况, 比较简单, 它主要关注错误信息:
无异常 : 则从ackQueue中取出第一个 packet对比序列号, 相同则视为此packet写成功, 从队中移除
有异常 : 标记错误下标. 然后抛出(发送线程dataStreamer去处理)
因为它的实现比较简单, 就不单独画流程图了, 直接看看具体的代码实现: (注意细节 )
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 @Override public void run () { PipelineAck ack = new PipelineAck(); while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) { try { ack.readFields(blockReplyStream); if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) Long begin = packetSendTime.get(ack.getSeqno()); long seqno = ack.getSeqno(); ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>(); for (int i = ack.getNumOfReplies()-1 ; i >=0 && dfsClient.clientRunning; i--) { final Status reply = PipelineAck.getStatusFromHeader(ack.getHeaderFlag(i)); if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) == CONGESTED) congestedNodesFromAck.add(targets[i]); if (PipelineAck.isRestartOOBStatus(reply)) { errorState.initRestartingNode(i, message, shouldWaitForRestart(i)); throw new IOException(target[i]+"is restarting" ); } if (reply != SUCCESS) { errorState.setBadNodeIndex(i); throw new IOException("Bad response from datanode " + targets[i]); } } if (!congestedNodesFromAck.isEmpty()) { synchronized (congestedNodes) { congestedNodes.clear(); congestedNodes.addAll(congestedNodesFromAck); } } else { synchronized (congestedNodes) { congestedNodes.clear(); lastCongestionBackoffTime = 0 ; } } assert seqno != PipelineAck.UNKOWN_SEQNO : "Ack for unknown seqno should be a failed ack: " + ack; if (seqno == DFSPacket.HEART_BEAT_SEQNO) continue ; DFSPacket one; synchronized (dataQueue) { one = ackQueue.getFirst(); } if (one.getSeqno() != seqno) throwIOException("except + " +one.getSeqno() + " but received " + seqno); isLastPacketInBlock = one.isLastPacketInBlock(); block.setNumBytes(one.getLastByteOffsetBlock()); synchronized (dataQueue) { lastAckedSeqno = seqno; pipelineRecoveryCount = 0 ; ackQueue.removeFirst(); packetSendTime.remove(seqno); dataQueue.notifyAll(); one.releaseBuffer(byteArrayManager); } } catch (Exception e) { if (!responderClosed) { lastException.set(e); errorState.setInternalError(); errorState.markFirstNodeIfNotMarked(); synchronized (dataQueue) { dataQueue.notifyAll();} responderClosed = true ; } } finally {.....} }}
0x01. 关闭流 上一篇在客户端的写操作代码示例里, 主要就是三个步骤, 其中最后一步是关闭流的操作.
如果只是单纯的关闭流, 那么是不需要说的, 这里主要是因为写入的close()
方法里有一些重要 的点, 主要是:
处理文件尾不足一个 Packet大小的数据.
发送complete()
RPC 告知文件完成
停止续约 (在租约篇 详述)
先来整体看看close()
方法的调用:
sequenceDiagram
participant A as Client
participant B as DFSOutputStream
participant C as DataStreamer
participant D as NameNode
%%participant E as DataNode
A->>+B: close()
B->>B: flushBuffer()
alt packet非空
B->>+B: 剩余数据包入队()
B->>-C: waitAndQueuePacket()
activate C
end
B->>+B: 生成空尾包()
B->>-C: 发尾包, 等待ack收尾
deactivate C
B->>D: RPC请求: complete()
Note right of D: 此时NN释放租约
B->>+B: closeThreads()
B->>C: 关闭线程与socket
B->>-B: 客户端停止更新租约
B-->>-A: 写完成
前几个方法调用细节上一篇已经说过, 不再重复. 这里举个具体的例子, 说一下方法会如何执行
从客户端写入一个不满 1个packet大小(63.5k)的文件, 比如10K的文件
10K数据被写入packet中, 但是不会被放入dataQueue (相当于火车货物不足, 不发车)
在调用close()的时候, 再放入dataQueue发送 (末班车 , 货不足也要发走了)
然后生成一个空尾包 并发送, (标识)告诉DN 当前文件已经传输完了 (DN收班信号 )
最后等待DN返回成功, 然后再向NN发RPC通知完成文件 , 释放租约等收尾操作.
“空尾包 “也就是后面很多地方会用到的lastPacketInBlock
参数为true
的包, 然后看看客户端complete()
处理的逻辑, 何时NN认为可以完成了?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 protected void completeFile (ExtendedBlock last) throws IOException { boolean fileComplete = false ; int retries = getNumBlockWriteLocateFollowingRetry(); while (!fileComplete) { fileComplete = dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId); if (!fileComplete) { if (!dfsClient.clientRunning || rpcTimeout(..)) throw new IOException("unable to contact Server or RPCtimeout" ); try { if (retries == 0 ) throw new IOException("last block does't have enough replicas" ); retries--; Thread.sleep(sleeptime); sleeptime *= 2 ; } catch (InterruptedException ie) {LOG.warn("Caught exception " , ie);} }}} new DFSPacket(emptyBytes, 0 , getStreamer().getBytesCurBlock(), getStreamer().getAndIncCurrentSeqno(), true ) new DFSPacket(emptyBytes, 0 , 0 , DFSPacket.HEART_BEAT_SEQNO, 0 , false );
0x02. 带心跳包的整个流程 然后我们通过打开DEBUG
日志, 来实际看看, 写一个大小超过 1个packet大小的文件的具体过程, 代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 Path writeFile (FileSystem fs, Path f) throws Exception { int bufferSize = 65017 ; byte [] bytes = new byte [bufferSize]; Arrays.fill(bytes, (byte ) 88 ); FSDataOutputStream outputStream = fs.create(f); outputStream.write(bytes); Thread.sleep(50000 ); outputStream.close(); return f; }
为了稍微简化日志和流程, 限定为两个DN , 带发送心跳包的完整过程图:
graph LR
subgraph 第三部分
d2(在close阶段处理少量剩余数据) --> e2(packet_n-1入队) --> f2(空尾包入队)--> g2(依次发送, 等待ack) --> h(完成文件)
end
subgraph 第二部分
d(写chunk直到满packet) --> e(进入dataQueue) --> f(转到ackQueue)--> g(写packet到pipeline)
end
subgraph 第一部分
a(Client) --发送--> b(create-RPC) --> c(后台租约开始定时更新)
end
对应Client端主要的日志 如下: (无关信息简略)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 DFSClient: /test /readFileA: masked={ masked: rw-r--r--, unmasked: rw-rw-rw- } RPC-Call: create took 99ms DFSClient: computePacketChunkSize: src=/test /readFileA, chunkSize=516, chunksPerPacket=126, packetSize=65016 Lease renewer daemon for [DFSClient_NONMAPREDUCE_1659795017_1] with renew id 1 started DFSClient: WriteChunk allocating new packet seqno=0, packetSize=65016, chunksPerPacket=126, bytesCurBlock=0, DFSOutputStream:block==null DFSOutputStream: enqueue full packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, bytesCurBlock=64512, blockSize=134217728, appendChunk=false , block==null DataStreamer: Queued packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512, block==null DataStreamer: stage=PIPELINE_SETUP_CREATE, block==null DataStreamer: Allocating new block: block==null RPC-Call: addBlock took 39ms DataStreamer: pipeline = [ DatanodeInfoWithStorage[10.172.195.33:9866,DS-c129f737-341c-4207-822b-c4328bfb585d,DISK], DatanodeInfoWithStorage[10.172.195.50:9966,DS-7e74ef68-00c8-4f40-a9c7-8f11310fb62a,DISK] ], blk_1073741979_1164 DataStreamer: Connecting to datanode 10.172.195.33:9866, Send buf size 65536 RPC-Call: getServerDefaults took 15ms DataStreamer: blk_1073741979_1164 sending packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512 DataStreamer: DFSClient seqno: 0 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 4163346 flag: 0 flag: 0 DataStreamer: stage=DATA_STREAMING, blk_1073741979_1164 DataStreamer: blk_1073741979_1164 sending packet seqno: -1 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 0 DataStreamer: DFSClient seqno: -1 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 1167698 flag: 0 flag: 0 ... 此处省略重复的心跳包, 大概10几秒一次 DFSClient: WriteChunk allocating new packet seqno=1, src=/test /readFileA, packetSize=65016, chunksPerPacket=126, bytesCurBlock=64512, DFSOutputStream:blk_1073741979_1164 DataStreamer: Queued packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65017, blk_1073741979_1164 DataStreamer: Queued packet seqno: 2 offsetInBlock: 65017 lastPacketInBlock: true lastByteOffsetInBlock: 65017, blk_1073741979_1164 DataStreamer: blk_1073741979_1164 waiting for ack for : 2 DataStreamer: blk_1073741979_1164 sending packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65017 DataStreamer: DFSClient seqno: 1 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 1188867 flag: 0 flag: 0 DataStreamer: blk_1073741979_1164 sending packet seqno: 2 offsetInBlock: 65017 lastPacketInBlock: true lastByteOffsetInBlock: 65017 DFSClient seqno: 2 reply: SUCCESS reply: SUCCESS downstreamAckTimeNanos: 2767076 flag: 0 flag: 0 DataStreamer: Closing old block BP-1860588468-10.172.195.33-1572260690191:blk_1073741979_1164 RPC-Call: complete took 72ms
这里的日志比较冗长, 但是如果想彻底搞清楚写入过程在Client端的全部过程, 还是得耐心看一下, 每个数值的变化 , 为什么变了, 变了多少, 都能搞清楚, 那就说明HDFS在客户端的写流程, 基本清楚了
并且, 看日志还有个重要作用就是, 处理线上疑难问题的时候, 基本都是靠日志来快速定位, 所以耗时 , 大小, 序号这些数字都需要有个大致印象.
0x03. 问题 以下是客户端写过程存在的问题记录:
Client在哪些情况下, 会把lastPacketInBlock
设置为ture?
在写手动调用close()
方法关闭流时, 最后会发一个空尾包, 它的参数为true
在调用writeChunk()
写满一个block时, 会触发endBlock()
, 此时会发送一个空尾包.
(特别)在flushOrSync()
方法里, 有一个END_BLOCK
枚举, 如果client调用hflush/hsync
方法时传入此, 那么会马上发送一个空尾包完成块.
在发当前block的最后一个packet时, 要等待之前的包都acked, 在等到ackQueue为空时, 应该说明前面的包都正常写入了, 在什么情况下才会触发shouldStop()
呢?
繁忙DN(Congested DN)是从DN的返回ACK中携带的, 它在DN端是如何判断的, 它能有效缓解写繁忙的场景么?
如果一个DN长时间是繁忙状态, Client会考虑剔除它么?
Client端一段时间没有发packet, 则会发一个心跳包给DN. 两个问题:
DN端似乎没通过seqNo来判断心跳包的, 而是把心跳和空包当做一类(len=0
), 那它们是否会放入ackQueue让responder线程处理? (会, 只输出一条debug日志)
Client端会接受到DN返回ACK的心跳包, 说明responder线程处理了? 怎么处理的 (就当做一个没有数据的包, 然后原路返回给client)
Client在发送最后一个空Packet的时候如果出错, 会进入错误恢复, 如何处理. (详见错误恢复和追加写 篇)
如果第二个Datanode超时失去响应, 此时第三个DN处理结果未知, 返回信息是什么样的? (ack里只有两个DN返回值, 不确定的DN先不管)
实际验证中, 发送刚好超过一个packet大小(65016
字节)的文件, 为什么第一个packet只有64512
字节, 而不是65016
B呢?
HDFS也可以支持并行写(非社区), 它是否对client端压力过大? 什么场景适合使用?
Client端的普通写流程就差不多说完了, 下一篇 介绍在Namenode端的写入相关, 分组件来理解,
注: 追加写和异常处理, 这一块内容同样比较多, 加上租约和flush/sync
的操作, 一起拆分到单独的追加写篇 细说了 (和普通写分开)
参考资料:
HDFS3.1.2官方源码
Hadoop 2.X HDFS源码剖析
HDFS-3398:客户端写pipeline时抛出异常的处理