前两篇 介绍了HDFS普通写(不包括错误恢复)全局的概览, 以及在Client端 实现的过程, 接着来看看在Namenode 端的实现和细节.
0x00. 前置 从这里开始区分block (块)和replication (块副本)的概念, 一般认为block代表的是Namenode端的概念, 而在Datanode端, 存储的是Namenode中block对应的副本, 需要区分开.
1. 块的状态 首先, 需要了解Block在NN这有4个 基本的状态:
Under_Construction (正在写入状态): 写入数据中的状态, 此时写入的数据是可读 的
Under_Recovery (恢复状态): 客户端租约过期后, 就需要进行租约恢复和块恢复时block的状态 (例如文件的lastBlock处于写入状态, Client异常断开超时)
Committed (已提交状态):
由UC(写入状态)转变而来, 说明此时客户端发送了addBlock或complete的RPC
意味着Client将此block的数据全发给了DN且全部ACK成功
但是 此时没有收到足够 的DN汇报有Finalized状态的副本 (默认情况下是指没有任一DN汇报)
Complete (完成状态):
正常情况它应该是由Committed 态转变而来 (存在特殊情况可由其他态直接转化.)
意味着block不再处于修改状态 (那它的长度 和**时间戳(GS)**都不再会发生变化)
此时NN收到了足够数量DN汇报处于Finalized 状态的副本 (最小副本数默认1, 也就是至少1个DN汇报成功)
仅当一个文件的所有 block都处于Complete状态, 文件才能被关闭
注: 若block处于UR状态下, block也可能处于完成态但副本数没满足最小要求.
所以写文件时block状态有两条常见 转变:
graph LR
subgraph 异常情况
UC --> Committed --> UR --> Complete
UC --> UR --> Complete
end
subgraph 正常提交block
1(UC)--> 2(Committed) --> 3(Complete)
end
2. 副本状态 然后对应到Replica也有5大 状态, 当然这主要是在Datanode端用到的:
RBW (Replica Being Written): 副本处于正在写入的状态, 此时已写入的数据是可读 的
RWR (Replica Waiting Recovered): 副本处于等待恢复的状态, DN重启后所有RBW 状态的副本都会变成此状态, 然后等待租约恢复变为RUR 状态
RUR (Replica Under Recovery): 副本处于正在恢复的状态, 比如租约过期后发生租约恢复 , 以及数据块恢复 时都处于此状态
Temporary : DN之间传输副本的状态(例balance时), 此状态数据不可读 , 且如果DN重启, 会删除此状态副本
Finalized : 副本在DN上已经写完成, 不再变动
0x01. 创建文件 在客户端create() 调用后, NN给它返回了一个空的文件, 在对应的路径目录树中添加了文件信息, 那具体是怎么做的呢? 一起来看看位于FSNamesystem中的startFileInt()方法, 它有许多的检查操作, 以及可选的配置. 不是核心的会尽量简略, 大体流程是
graph TD
a(一系列基本检查) --> b{是否覆盖写} --是--> c(调用delete删除原有文件和块) -->d(在目录树对应地址创建新的INode-UC状态) --> e(添加租约) --> f(在editLog记录操作)
b -.不是.-> d
subgraph Part A
b
c
end
subgraph Part B
d
e
end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private HdfsFileStatus startFileInt (params...) { if (validateFileName(src)) throw new InvalidPathException (src); INodesInPath iip; boolean skipSync = true ; HdfsFileStatus stat; BlocksMapUpdateInfo toRemoveBlocks = null ; checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { iip = FSDirWriteFileOp.resolvePathForStartFile(dir, pc, src, flag, createParent); skipSync = false ; toRemoveBlocks = new BlocksMapUpdateInfo (); dir.writeLock(); try { stat = FSDirWriteFileOp.startFile(params); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; } finally { dir.writeUnlock(); } } finally { writeUnlock("create" ); if (!skipSync) { getEditLog().logSync(); if (toRemoveBlocks != null ) { removeBlocks(toRemoveBlocks); toRemoveBlocks.clear(); }}} return stat; }
创建文件看看核心的startFile() 方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static HdfsFileStatus startFile (params...) { boolean overwrite = flag.contains(CreateFlag.OVERWRITE); boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST); final String src = iip.getPath(); FSDirectory fsd = fsn.getFSDirectory(); if (iip.getLastINode() != null ) { if (overwrite) { List<INode> toRemoveINodes = new ChunkedArrayList <>(); List<Long> toRemoveUCFiles = new ChunkedArrayList <>(); long ret = FSDirDeleteOp.delete(params...); if (ret >= 0 ) { iip = INodesInPath.replace(iip, iip.length() - 1 , null ); FSDirDeleteOp.incrDeletedFileCount(ret); fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true ); } } else { fsn.recoverLeaseInternal(params..); throw new FileAlreadyExistsException (src + " for client XX already exists" ); } } fsn.checkFsObjectLimit(); INodeFile newNode = null ; INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); if (parent != null ) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, ecPolicyName); newNode = iip != null ? iip.getLastINode().asFile() : null ; } if (newNode == null ) throw new IOException ("Unable to add file to namespace" ); fsn.leaseManager.addLease(newNode_params..); if (feInfo != null ) FSDirEncryptionZoneOp.setFileEncryptionInfo(params...); setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); return FSDirStatAndListingOp.getFileInfo(fsd, iip, false , false ); }
0x02. 创建block 同样, 创建新的block 在Namenode也是在FSNamesystem做控制, 然后再调用具体的函数处理, 整体流程是:
graph TD
a(如果已有两个块, 则确认第一个块complete) --> b(选取3个DN) --> c(提交或完成上一个block) --> d(构造新的block) --> e(并更新INode/BlockMap信息) --> f(在editLog记录操作)
subgraph Part A
b
end
subgraph Part B
c
d
e
end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 LocatedBlock getAdditionalBlock (params...) { LocatedBlock[] onRetryBlock = new LocatedBlock [1 ]; FSDirWriteFileOp.ValidateAddBlockResult r; readLock(); try { r = FSDirWriteFileOp.validateAddBlock(params.., previous, onRetryBlock); } finally { readUnlock(operationName); } if (r == null ) { assert onRetryBlock[0 ] != null : "Retry block is null" ; return onRetryBlock[0 ]; } DatanodeStorageInfo[] targets; target = FSDirWriteFileOp.chooseTargetForNewBlock(blockManager, params.., r); writeLock(); LocatedBlock lb; try { lb = FSDirWriteFileOp.storeAllocatedBlock(params, previous, targets); } finally { writeUnlock(operationName); } getEditLog().logSync(); return lb; }
上面比较清晰的划分三步, 那么下面就依次来看看各自做了什么
1. 检查 暂略, 无关主线的细节. 以后再补
2. 选择DN(主要) 这里主要就是如何选择合适DN, 细节很多, 但是大体来说是如下的规则:
第一个DN以和Client距离最近 优先
第二个DN再考虑比如跨机架 等因素选择
然后满足条件的DN也需要满足一系列的基本检查, 比如可以写入, 空间充足等, 这里也暂不细说了.
DN属于哪些机架需要手动配置, 这里机架相关的选择细节暂略. 后续用到再看, 相关的流程图可以参考书中对应章节.
3. 更新元信息 首先, 在最早客户端申请了一个空的文件, 就有了一个INode信息, 现在新增了一个block, 自然需要更新两个关系 :
文件和Block的关系
Block和对应Datanode的关系
0x04. 完成文件 Client发送了complete() 的RPC请求后, NN通过RPCServer 接受后做一些基本检查后转交FSNamesystem处理, 先看看整体流程:
graph TD
a(检查租约) --> b(确认第n-1个块complete) --> c(提交或完成当前块) --> d(检查文件的所有块处于complete) --> e(如果文件缺副本, 则放入待复制队列) --> f(将INode转为正常态)
subgraph Part A
b
end
subgraph Part B
c
d
e
end
complete的过程比较重要, 而且这里有些棘手的异常需要注意, 下面是核心的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private static boolean completeFileInternal (FSNamesystem fsn, INodesInPath iip, String holder, Block last, long fileId) { final String src = iip.getPath(); final INodeFile pendingFile; INode inode = null ; try { inode = iip.getLastINode(); pendingFile = fsn.checkLease(iip, holder, fileId); } catch (LeaseExpiredException lee) { if (inode.isFile() && !inode.asFile().isUnderConstruction()) { final Block realLastBlock = inode.asFile().getLastBlock(); if (Block.matchingIdAndGenStamp(last, realLastBlock)) return true ; } throw lee; } if (!fsn.checkFileProgress(src, pendingFile, false )) return false ; fsn.commitOrCompleteLastBlock(pendingFile, iip, last); if (!fsn.checkFileProgress(src, pendingFile, true )) return false ; fsn.addCommittedBlocksToPending(pendingFile); fsn.finalizeINodeFileUnderConstruction(src, pendingFile,Snapshot.CURRENT_STATE_ID, true ); return true ; } public boolean commitOrCompleteLastBlock (BlockCollection bc, Block commitBlock, INodesInPath iip) throws IOException { if (commitBlock == null ) return false ; BlockInfo lastBlock = bc.getLastBlock(); if (lastBlock == null ) return false ; if (lastBlock.isComplete()) return false ; if (lastBlock.isUnderRecovery()) throw new IOException ("block is under recovery." ); final boolean committed = commitBlock(lastBlock, commitBlock); NumberReplicas numReplicas = countNodes(lastBlock); int numUsableReplicas = numReplicas.liveReplicas() + numReplicas.decommissioning() + numReplicas.liveEnteringMaintenanceReplicas(); if (hasMinStorage(lastBlock, numUsableReplicas)) { if (committed) addExpectedReplicasToPending(lastBlock); completeBlock(lastBlock, iip, false ); } else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) { completeBlock(lastBlock, iip, true ); updateNeededReconstructions(lastBlock, 1 , 0 ); } return committed; } private boolean commitBlock (final BlockInfo block, final Block commitBlock) { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false ; assert block.getNumBytes() <= commitBlock.getNumBytes(); if (block.getGenerationStamp() != commitBlock.getGenerationStamp()) throw new IOException ("mismatching GS" ); List<ReplicaUnderConstruction> staleReplicas = block.commitBlock(commitBlock); removeStaleReplicas(staleReplicas, block); return true ; }
complete方法虽然本身的逻辑其实从图上看是很简单的, 但是要注意这里如果完成失败, 可能是哪些异常, 然后如果一个文件提交失败的话, 它会处于什么状态呢?它的租约会被释放么? 如果没有释放会进入租约恢复么? 这些问题都是需要注意的…
0x0n. 问题记录
在客户端发起complete()的时候, 如果文件有两个block, 第一个block已经complete, 第二个副本数只有1 , 但不足最小副本数(比如NN端设置为2), 客户端重试完后会抛出副本不足的异常结束, 此时NN端文件租约 没有被正常释放, 有以下几个问题:
此时文件(INode)的状态, 以及第二个块的状态是什么? (文件应该处于UC, 但第二个block应该处于Committed 状态)
此时NN端文件租约未被释放, 是只能等待硬超时触发租约恢复么? (也就是哪些情况能变成UR态.)
其他客户端尝试追加写此文件, 是否会提示租约被占用
此时文件的状况是什么情况, 文件显示有几个块? 如果是两个, 那第二个块长度是多少? 文件长度又是多少?
按理说文件的第一个块是可读的, 那第二个块呢? NN收到了第二个块的一个Finalized副本汇报, 它会允许读么? 如果不允许报什么错?
在NN的complete() 逻辑里, 有一个**强行关闭(complete)**副本数不足, 但处于恢复状态块的逻辑, 但从上下文来看, 不知何时才能触发此种情况. 以下是官方的注释
We’ve just finished recovery for this block, complete the block forcibly disregarding number of replicas. This is to ignore minReplication, the block will be closed and then replicated out .
疑问在于:
代码开头判断了block只能是UC或Committed状态, 然后通过commitBlock() 将UC态也转为Committed态. 如果转换失败抛异常了, 如果block能走到这个判断逻辑, 说明此时块已处于Committed状态, 如何会出现lastBlock处于恢复态的情况呢?
处于Committed状态的块, 还能变成恢复状态(UR)么? (比如下面的情况, 它会变为UR态么)
一个块处于Committed状态后, 一直没有达到Complete态的要求, 客户端断开后, 那它会如何被处理?
abandonBlock()以及其它错误恢复相关的RPC, 在追加写和错误恢复篇 再说, 下一篇是普通写流程的完结-Datanode篇
参考资料:
HDFS3.1.2官方源码
Hadoop 2.X HDFS源码剖析-Chapt3.4