由于HDFS的写入操作比较复杂, 正常的写流程就需要4篇来描述, 所以追加写和错误恢复的部分, 得单独拆出来看, 这样阅读起也清晰一些.同样先看Client的部分, 然后看Namenode和Datanode部分, 并在中间会正式引入另外一个重要概念—-租约
0x00. 前言 普通的写入操作图参考之前 , 它是基于不出错误/异常的前提下的流程. 但要知道文件系统出现异常 的情况是挺多的, 触发的原因也是多开花, 那么写一个大文件, 中间如果发生了异常, 写入操作会停止么? 如果不是直接停止, 那就有一系列的判断和恢复 逻辑, 也就引出了今天的主题—–追加写与错误恢复
至于为什么错误恢复会搭着追加写一起说呢? 后面会逐渐解答这个问题.
整个写入操作中有大量的代码是错误恢复 和异常处理 相关的, 基于之前的正常写流程, 我画了一张新的错误恢复流程图 , 可以对比 看看区别:
然后谈错误恢复, 也有几个分类方式, 常见的也是按组件来划分, 比如Client出现异常, Namenode出现异常, 以及Datanode出现异常.
此外有异常, 就有恢复, 有两个核心恢复:
Pipeline恢复 (主要由Client发起, 和上图对应)
租约恢复 (主要由Namenode发起, 和上图无关 )
注意: 这两类恢复基本是完全不同 的情况, 切不可混为一谈. 你可能还会看到有些文章多写了个block恢复 , 这个说法是很容易误导人的, 因为它本身就是租约恢复的一部分, 之后再细说.
0x01. Pipelineの错误恢复 那么这篇的重点自然是说对应头图的Pipeline恢复 . Client
–> NameNode
–> DataNode
, 这三个参与写入过程的组件都可能出现异常, 然后抛出错误. 先来看看核心Client端的部分. 先尝试按DataStreamer
线程的run()
方法时间线 来分一下几种情况:
Pipeline 初始化: 最早是在给NN发请求/初始化pipeline之前, 也就是pipeline还未到就绪态时 (Pipeline_Init)
发数据过程中: 然后是在达到就绪态, 发送packet到pipeline的过程中, 出现错误 (Pipeline_Streaming)
Pipeline收尾时: 此时除最后一个空尾包外, 其他数据包都发送完毕且acked, 但空尾包acked带有错误 (Pipeline_Close)
所以, 在pipeline的几个状态都有可能出现异常, 虽然它们处理方式略有区别 , 但核心 的错误恢复流程在Client端可大体分为三步:
关闭当前DataStreamer
和Response
线程
把现有ackQueue
队的packet搬回 到dataQueue
中
重新初始化pipeline并判断是否需要添加新Datanode
1. 整体流程 先看下Client的两个工作线程 是如何发现错误, 并处理错误的:
graph TD
subgraph response线程发现异常
循环等待--接受ack--> 读取错误信息 --> error
error((错误标志位)) --> ErrorState
error((错误标志位)) --> 异常DN下标
end
subgraph DataStreamer线程处理异常
a(发现错误标志位) --> b(关闭当前pipeline) --> c(把ackQueue元素全搬回dataQueue) --> d{当前packet恢复是否超过5次} --No--> e(重新初始化pipeline--核心) --> f{pipeline是否处于ClOSE阶段} --No--> g(重新启动response线程) --> streaming状态 --> h(完成此次错误恢复)
f --Yes--> 尾包acked异常 --> 调用endBlock置空pipeline --> create状态 --> h
subgraph 循环重试
d
e
end
end
然后看看DataStreamer线程处理错误的入口processDatanodeOrExternalError()
, 每次循环处理packet前, 都会先检查判断当前是否有异常:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 private boolean processDatanodeOrExternalError () { if (!errorState.hasDatanodeError() && !shouldHandleExternalError()) return false ; if (response != null ) return true ; closeStream(); synchronized (dataQueue) { dataQueue.addAll(0 , ackQueue); ackQueue.clear(); } if (!errorState.isRestartingNode() && ++pipelineRecoveryCount > 5 ) { streamerClosed = true ; return false ; } setupPipelineForAppendOrRecovery(); if (!streamerClosed && dfsClient.clientRunning) { if (stage == BlockConstructionStage.PIPELINE_CLOSE) { synchronized (dataQueue) { DFSPacket endOfBlockPacket = dataQueue.remove(); assert lastAckedSeqno == endOfBlockPacket.getSeqno() - 1 ; lastAckedSeqno = endOfBlockPacket.getSeqno(); pipelineRecoveryCount = 0 ; dataQueue.notifyAll(); } endBlock(); } else { initDataStreaming(); } } return false ; }
2. 重新初始化Pipeline核心 看懂了整体流程, 再来看看核心的pipeline恢复流程: (处理重启DN 的逻辑暂略 ,)
graph TD
a(先剔除错误DN) --> c{是否添加新的DN} --Yes--> d{新DN是否需要同步数据} --Yes--> d1(给DN发Transfer请求)--> e(申请新的block时间戳) -->f{重新初始化pipeline}--SUCCESS--> g(发RPC给NN更新时间戳) --> h(完成这次错误恢复)
c --No--> e
f --Failed--> h
subgraph 加入新Datanode流程
c
d
d1
end
subgraph re-init
e
f
g
end
下面是setupPipelineForAppendOrRecovery()
的实际调用, 之后追加写 也会用到它:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 protected void setupPipelineInternal (DatanodeInfo[] datanodes, StorageType[] nodeStorageTypes, String[] nodeStorageIDs) { boolean success = false ; long newGS = 0L ; while (!success && !streamerClosed && dfsClient.clientRunning) { if (!handleRestartingDatanode()) return ; final boolean isRecovery = errorState.hasInternalError(); if (!handleBadDatanode()) return ; handleDatanodeReplacement(); final LocatedBlock lb = updateBlockForPipeline(); newGS = lb.getBlock().getGenerationStamp(); accessToken = lb.getBlockToken(); success = createBlockOutputStream(nodes, storageTypes, storageIDs, newGS, isRecovery); errorState.checkRestartingNodeDeadline(nodes); } if (success) { final ExtendedBlock oldBlock = block.getCurrentBlock(); updateBlockGS(newGS); dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock, block.getCurrentBlock(), nodes, storageIDs); } } boolean handleBadDatanode () { final int badNodeIndex = errorState.getBadNodeIndex(); if (badNodeIndex >= 0 ) { if (nodes.length <= 1 ) { lastException.set(new IOException("All datanodes xx are bad. Aborting..." )); streamerClosed = true ; return false ; } if (errorState.getRestartingNodeIndex() == badNodeIndex) restartingNodes.add(nodes[badNodeIndex]); failed.add(nodes[badNodeIndex]); DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1 ]; arraycopy(nodes, newnodes, badNodeIndex); final StorageType[] newStorageTypes = new StorageType[newnodes.length]; arraycopy(storageTypes, newStorageTypes, badNodeIndex); final String[] newStorageIDs = new String[newnodes.length]; arraycopy(storageIDs, newStorageIDs, badNodeIndex); setPipeline(newnodes, newStorageTypes, newStorageIDs); errorState.adjustState4RestartingNode(); lastException.clear(); } return true ; }
3. 新增Datanode逻辑 当接着看看满足当前block的DN存活数 ≤ 1时, 替换新的DN的时候的细节:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 private void addDatanode2ExistingPipeline () { if (!isAppend && lastAckedSeqno < 0 && stage == PIPELINE_SETUP_CREATE) return ; if (stage == PIPELINE_CLOSE || stage == PIPELINE_CLOSE_RECOVERY) return ; int tried = 0 ; final DatanodeInfo[] original = nodes; final StorageType[] originalTypes = storageTypes; final String[] originalIDs = storageIDs; IOException caughtException = null ; ArrayList<DatanodeInfo> exclude = new ArrayList<>(failed); while (tried < 3 ) { LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(params); setPipeline(lb); final int d; try { d = findNewDatanode(original); } catch (IOException ioe) { short minReplicaReplaceNum = dfsClient.dtpReplaceDatanodeOnFailureReplication; if (minReplicaReplaceNum > 0 && nodes.length >= minReplicaReplaceNum) return ; throw ioe; } final DatanodeInfo src = original[tried % original.length]; final DatanodeInfo[] targets = {nodes[d]}; final StorageType[] targetStorageTypes = {storageTypes[d]}; final String[] targetStorageIDs = {storageIDs[d]}; try { transfer(src, targetsParams, lb.getBlockToken()); } catch (IOException ioe) { caughtException = ioe; exclude.add(nodes[d]); setPipeline(original, originalTypes, originalIDs); tried++; continue ; } return ; } throw (caughtException != null ) ? caughtException : new IOException("Failed to add a node" ); }
4. 小结 假设在常见3副本的场景下, 传输一个大小超过64K 的文件, 出现了以下几种可能的错误:
在创建pipeline时发生了错误, 给NN发送abandonBlock-RPC, 放弃此Block, 加入黑名单, 重试
在pipeline创建好后, 发送packet的途中出现了错误, 分这两种常见情况:
写入pipeline后 , response线程收到DN返回的ack里标记异常
写入pipeline时 发生了错误, 如果当前没有其他DN被标记异常, 那么直接认为第一个DN异常
在普通packet已经都发送完, 发送空尾包时, DN返回的ack
状态有异常
如果重新建立无异常, 则更新block时间戳, 然后正常完成block
如果重新建立pipeline仍有异常, 则无视这个异常, 视为正常完成block
0x02. 追加写 首先, “追加写 “顾名思义就是之前已经存在一个文件oldFile
, 现在想再接着原有内容继续写数据, 代码使用如下:
1 2 3 4 5 6 7 Path oldFile = writeFile(fs, new Path("/test/oldFile1" )); FSDataOutputStream appendStream = fs.append(oldFile); appendStream.writeBytes("new message" ); appendStream.close();
(流程图待补 ) 关键在于处理之前Block的最后一个Chunk 的细节地方, 所以只看代码是肯定记不住的.
然后初始化appendStream
的过程稍微多一点, 它需要给NN发append()
RPC, 获得旧文件的最后一个block信息, 然后根据block是否已满创建DataStreamer
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 public HdfsDataOutputStream append (...params...) { final DFSOutputStream out = append(src, buffersize, flag, null , progress); return createWrappedOutputStream(out, statistics, out.getInitialLen()); } private DFSOutputStream callAppend (String src, EnumSet<CreateFlag> flag, Progressable progress, String[] favoredNodes) { try { final LastBlockWithStatus blkWithStatus; blkWithStatus = callAppend(src, new EnumSetWritable<>(flag, CreateFlag.class)); HdfsFileStatus status = blkWithStatus.getFileStatus(); if (status == null ) status = getFileInfo(src); return DFSOutputStream.newStreamForAppend(this , src, flag, progress, blkWithStatus.getLastBlock(), status, dfsClientConf.createChecksum(null ), favoredNodes); } catch (RemoteException re) {throw XxxExceptions.class;} } private DFSOutputStream (...params...) { this (dfsClient, src, flags, progress, stat, checksum); if (!toNewBlock && lastBlock != null ) { streamer = new DataStreamer(lastBlock, stat, dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager); getStreamer().setBytesCurBlock(lastBlock.getBlockSize()); adjustPacketChunkSize(stat); getStreamer().setPipelineInConstruction(lastBlock); } else { computePacketChunkSize(dfsClient.getConf().getWritePacketSize(), bytesPerChecksum); streamer = new DataStreamer(...params...); } } DataStreamer(LocatedBlock lastBlock, parmas...) { this (stat, lastBlock.getBlock(), dfsClient, src, progress, checksum, cachingStrategy, byteArrayManage, true , null , null ); stage = BlockConstructionStage.PIPELINE_SETUP_APPEND; bytesSent = block.getNumBytes(); accessToken = lastBlock.getBlockToken(); } private void adjustPacketChunkSize (HdfsFileStatus stat) { long usedInLastBlock = stat.getLen() % blockSize; int freeInLastBlock = (int )(blockSize - usedInLastBlock); int usedInCksum = (int )(stat.getLen() % bytesPerChecksum); int freeInCksum = bytesPerChecksum - usedInCksum; if (freeInLastBlock == blockSize) throw new IOException(xx); if (usedInCksum > 0 && freeInCksum > 0 ) { computePacketChunkSize(0 , freeInCksum); setChecksumBufSize(freeInCksum); getStreamer().setAppendChunk(true ); } else { computePacketChunkSize(Math.min(getWritePacketSize(), freeInLastBlock), bytesPerChecksum); } }
0x03. 一些问题 1. 重建pipline时, 之前创建的socket会复用么, 还是重新初始化? 不会复用 . 在发现有异常时进入错误恢复流程开始, 就会调用closeStream()
. 先关闭blockStream和replyStream, 然后关闭socket , 在初始化pipeline的方法里再重建
2. abandonBlock啥时候调用? 传什么参数? 是在成功 addBlock()
申请了一个新的空块之后, 带着这个块对应的DN列表去创建Pipeline 时, 有任何异常导致创建失败, 则直接调用abandonBlock
告诉NN放弃刚生成的空block, 并把造成异常的DN 加入本地黑名单(仅1个), 然后之后找NN申请新block时则不会选择黑名单中DN.
此外, abandonBlock-RPC
仅传入刚申请的空块的信息, 让NN丢弃它, 并不会传入错误DN的信息.
3. 初始化Pipeline的方法中有个firstbadlink标记, 它是什么作用? 它标记着创建Pipeline的时候DN是否有错误, 如果有错误则记录错误DN下标 , 且注意区分两个情况:
在初始化申请block后创建pipeline时, 如果有DN/Client异常, 则返回创建失败并在abandonBlock
的流程中把当前DN加入黑名单 (见Q2)
如果是在错误恢复的流程中, 创建pipeline仍有异常, 则只剔除错误DN, 然后重新开始新的错误恢复 (不会把此DN加入黑名单)
4. 错误DN的恢复参数 有一个dfs.client.block.write.replace-datanode-on-failure.enable
参数, 标志是否开启错误DN的替换, 它和替换策略的里的NERVER
似乎等效, 可确认一下.
下一篇 接着讲追加写在Namenode端的相关流程, 并介绍一个新的朋友租约, 它也是异常恢复之租约恢复的主角, 并且客户端相关的租约操作, 也移到这篇一起说.
参考资料:
Hadoop 2.X HDFS源码剖析-Chap.5.2.x
Understanding HDFS Recovery Processes
Append, Hflush, and Read for implementation details(官方paper-推荐)