前两篇 介绍了HDFS普通写(不包括错误恢复)全局的概览, 以及在Client端 实现的过程, 接着来看看在Namenode 端的实现和细节.
0x00. 前置 从这里开始区分block (块)和replication (块副本)的概念, 一般认为block代表的是Namenode端的概念, 而在Datanode端, 存储的是Namenode中block对应的副本, 需要区分开.
1. 块的状态 首先, 需要了解Block
在NN这有4个 基本的状态:
Under_Construction (正在写入状态): 写入数据中的状态, 此时写入的数据是可读 的
Under_Recovery (恢复状态): 客户端租约过期后, 就需要进行租约恢复和块恢复时block的状态 (例如文件的lastBlock处于写入状态, Client异常断开超时)
Committed (已提交状态):
由UC(写入状态)转变而来, 说明此时客户端发送了addBlock
或complete
的RPC
意味着Client将此block的数据全发给了DN且全部ACK成功
但是 此时没有收到足够 的DN汇报有Finalized
状态的副本 (默认情况下是指没有任一DN汇报)
Complete (完成状态):
正常情况它应该是由Committed 态转变而来 (存在特殊情况可由其他态直接转化.)
意味着block不再处于修改状态 (那它的长度 和时间戳(GS) 都不再会发生变化)
此时NN收到了足够数量DN汇报处于Finalized 状态的副本 (最小副本数默认1, 也就是至少1个DN汇报成功)
仅当一个文件的所有 block都处于Complete状态, 文件才能被关闭
注: 若block处于UR状态下, block也可能处于完成态但副本数没满足最小要求.
所以写文件时block状态有两条常见 转变:
graph LR
subgraph 异常情况
UC --> Committed --> UR --> Complete
UC --> UR --> Complete
end
subgraph 正常提交block
1(UC)--> 2(Committed) --> 3(Complete)
end
2. 副本状态 然后对应到Replica
也有5大 状态, 当然这主要是在Datanode端用到的:
RBW (Replica Being Written): 副本处于正在写入的状态, 此时已写入的数据是可读 的
RWR (Replica Waiting Recovered): 副本处于等待恢复的状态, DN重启后所有RBW 状态的副本都会变成此状态, 然后等待租约恢复变为RUR 状态
RUR (Replica Under Recovery): 副本处于正在恢复的状态, 比如租约过期后发生租约恢复 , 以及数据块恢复 时都处于此状态
Temporary : DN之间传输副本的状态(例balance时), 此状态数据不可读 , 且如果DN重启, 会删除此状态副本
Finalized : 副本在DN上已经写完成, 不再变动
0x01. 创建文件 在客户端create()
调用后, NN给它返回了一个空的文件, 在对应的路径目录树中添加了文件信息, 那具体是怎么做的呢? 一起来看看位于FSNamesystem中的startFileInt()
方法, 它有许多的检查操作, 以及可选的配置. 不是核心的会尽量简略, 大体流程是
graph TD
a(一系列基本检查) --> b{是否覆盖写} --是--> c(调用delete删除原有文件和块) -->d(在目录树对应地址创建新的INode-UC状态) --> e(添加租约) --> f(在editLog记录操作)
b -.不是.-> d
subgraph Part A
b
c
end
subgraph Part B
d
e
end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private HdfsFileStatus startFileInt (params...) { if (validateFileName(src)) throw new InvalidPathException(src); INodesInPath iip; boolean skipSync = true ; HdfsFileStatus stat; BlocksMapUpdateInfo toRemoveBlocks = null ; checkOperation(OperationCategory.WRITE); final FSPermissionChecker pc = getPermissionChecker(); writeLock(); try { iip = FSDirWriteFileOp.resolvePathForStartFile(dir, pc, src, flag, createParent); skipSync = false ; toRemoveBlocks = new BlocksMapUpdateInfo(); dir.writeLock(); try { stat = FSDirWriteFileOp.startFile(params); } catch (IOException e) { skipSync = e instanceof StandbyException; throw e; } finally { dir.writeUnlock(); } } finally { writeUnlock("create" ); if (!skipSync) { getEditLog().logSync(); if (toRemoveBlocks != null ) { removeBlocks(toRemoveBlocks); toRemoveBlocks.clear(); }}} return stat; }
创建文件看看核心的startFile()
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 static HdfsFileStatus startFile (params...) { boolean overwrite = flag.contains(CreateFlag.OVERWRITE); boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST); final String src = iip.getPath(); FSDirectory fsd = fsn.getFSDirectory(); if (iip.getLastINode() != null ) { if (overwrite) { List<INode> toRemoveINodes = new ChunkedArrayList<>(); List<Long> toRemoveUCFiles = new ChunkedArrayList<>(); long ret = FSDirDeleteOp.delete(params...); if (ret >= 0 ) { iip = INodesInPath.replace(iip, iip.length() - 1 , null ); FSDirDeleteOp.incrDeletedFileCount(ret); fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true ); } } else { fsn.recoverLeaseInternal(params..); throw new FileAlreadyExistsException(src + " for client XX already exists" ); } } fsn.checkFsObjectLimit(); INodeFile newNode = null ; INodesInPath parent = FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions); if (parent != null ) { iip = addFile(fsd, parent, iip.getLastLocalName(), permissions, replication, blockSize, holder, clientMachine, shouldReplicate, ecPolicyName); newNode = iip != null ? iip.getLastINode().asFile() : null ; } if (newNode == null ) throw new IOException("Unable to add file to namespace" ); fsn.leaseManager.addLease(newNode_params..); if (feInfo != null ) FSDirEncryptionZoneOp.setFileEncryptionInfo(params...); setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist); fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry); return FSDirStatAndListingOp.getFileInfo(fsd, iip, false , false ); }
0x02. 创建block 同样, 创建新的block 在Namenode也是在FSNamesystem
做控制, 然后再调用具体的函数处理, 整体流程是:
graph TD
a(如果已有两个块, 则确认第一个块complete) --> b(选取3个DN) --> c(提交或完成上一个block) --> d(构造新的block) --> e(并更新INode/BlockMap信息) --> f(在editLog记录操作)
subgraph Part A
b
end
subgraph Part B
c
d
e
end
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 LocatedBlock getAdditionalBlock (params...) { LocatedBlock[] onRetryBlock = new LocatedBlock[1 ]; FSDirWriteFileOp.ValidateAddBlockResult r; readLock(); try { r = FSDirWriteFileOp.validateAddBlock(params.., previous, onRetryBlock); } finally { readUnlock(operationName); } if (r == null ) { assert onRetryBlock[0 ] != null : "Retry block is null" ; return onRetryBlock[0 ]; } DatanodeStorageInfo[] targets; target = FSDirWriteFileOp.chooseTargetForNewBlock(blockManager, params.., r); writeLock(); LocatedBlock lb; try { lb = FSDirWriteFileOp.storeAllocatedBlock(params, previous, targets); } finally { writeUnlock(operationName); } getEditLog().logSync(); return lb; }
上面比较清晰的划分三步, 那么下面就依次来看看各自做了什么
1. 检查 暂略, 无关主线的细节. 以后再补
2. 选择DN(主要) 这里主要就是如何选择合适DN, 细节很多, 但是大体来说是如下的规则:
第一个DN以和Client距离最近 优先
第二个DN再考虑比如跨机架 等因素选择
然后满足条件的DN也需要满足一系列的基本检查, 比如可以写入, 空间充足等, 这里也暂不细说了.
DN属于哪些机架需要手动配置, 这里机架相关的选择细节暂略. 后续用到再看, 相关的流程图可以参考书中对应章节.
3. 更新元信息 首先, 在最早客户端申请了一个空的文件, 就有了一个INode信息, 现在新增了一个block, 自然需要更新两个关系 :
文件和Block的关系
Block和对应Datanode的关系
0x04. 完成文件 Client发送了complete()
的RPC请求后, NN通过RPCServer
接受后做一些基本检查后转交FSNamesystem
处理, 先看看整体流程:
graph TD
a(检查租约) --> b(确认第n-1个块complete) --> c(提交或完成当前块) --> d(检查文件的所有块处于complete) --> e(如果文件缺副本, 则放入待复制队列) --> f(将INode转为正常态)
subgraph Part A
b
end
subgraph Part B
c
d
e
end
complete的过程比较重要, 而且这里有些棘手的异常需要注意, 下面是核心的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 private static boolean completeFileInternal (FSNamesystem fsn, INodesInPath iip, String holder, Block last, long fileId) { final String src = iip.getPath(); final INodeFile pendingFile; INode inode = null ; try { inode = iip.getLastINode(); pendingFile = fsn.checkLease(iip, holder, fileId); } catch (LeaseExpiredException lee) { if (inode.isFile() && !inode.asFile().isUnderConstruction()) { final Block realLastBlock = inode.asFile().getLastBlock(); if (Block.matchingIdAndGenStamp(last, realLastBlock)) return true ; } throw lee; } if (!fsn.checkFileProgress(src, pendingFile, false )) return false ; fsn.commitOrCompleteLastBlock(pendingFile, iip, last); if (!fsn.checkFileProgress(src, pendingFile, true )) return false ; fsn.addCommittedBlocksToPending(pendingFile); fsn.finalizeINodeFileUnderConstruction(src, pendingFile,Snapshot.CURRENT_STATE_ID, true ); return true ; } public boolean commitOrCompleteLastBlock (BlockCollection bc, Block commitBlock, INodesInPath iip) throws IOException { if (commitBlock == null ) return false ; BlockInfo lastBlock = bc.getLastBlock(); if (lastBlock == null ) return false ; if (lastBlock.isComplete()) return false ; if (lastBlock.isUnderRecovery()) throw new IOException("block is under recovery." ); final boolean committed = commitBlock(lastBlock, commitBlock); NumberReplicas numReplicas = countNodes(lastBlock); int numUsableReplicas = numReplicas.liveReplicas() + numReplicas.decommissioning() + numReplicas.liveEnteringMaintenanceReplicas(); if (hasMinStorage(lastBlock, numUsableReplicas)) { if (committed) addExpectedReplicasToPending(lastBlock); completeBlock(lastBlock, iip, false ); } else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) { completeBlock(lastBlock, iip, true ); updateNeededReconstructions(lastBlock, 1 , 0 ); } return committed; } private boolean commitBlock (final BlockInfo block, final Block commitBlock) { if (block.getBlockUCState() == BlockUCState.COMMITTED) return false ; assert block.getNumBytes() <= commitBlock.getNumBytes(); if (block.getGenerationStamp() != commitBlock.getGenerationStamp()) throw new IOException("mismatching GS" ); List<ReplicaUnderConstruction> staleReplicas = block.commitBlock(commitBlock); removeStaleReplicas(staleReplicas, block); return true ; }
complete方法虽然本身的逻辑其实从图上看是很简单的, 但是要注意这里如果完成失败, 可能是哪些异常, 然后如果一个文件提交失败的话, 它会处于什么状态呢?它的租约会被释放么? 如果没有释放会进入租约恢复么? 这些问题都是需要注意的…
0x0n. 问题记录
在客户端发起complete()的时候, 如果文件有两个block, 第一个block已经complete, 第二个副本数只有1 , 但不足最小副本数(比如NN端设置为2), 客户端重试完后会抛出副本不足的异常结束, 此时NN端文件租约 没有被正常释放, 有以下几个问题:
此时文件(INode)的状态, 以及第二个块的状态是什么? (文件应该处于UC, 但第二个block应该处于Committed 状态)
此时NN端文件租约未被释放, 是只能等待硬超时触发租约恢复么? (也就是哪些情况能变成UR态.)
其他客户端尝试追加写此文件, 是否会提示租约被占用
此时文件的状况是什么情况, 文件显示有几个块? 如果是两个, 那第二个块长度是多少? 文件长度又是多少?
按理说文件的第一个块是可读的, 那第二个块呢? NN收到了第二个块的一个Finalized副本汇报, 它会允许读么? 如果不允许报什么错?
在NN的complete()
逻辑里, 有一个强行关闭(complete) 副本数不足, 但处于恢复状态块的逻辑, 但从上下文来看, 不知何时才能触发此种情况. 以下是官方的注释
We’ve just finished recovery for this block, complete the block forcibly disregarding number of replicas. This is to ignore minReplication, the block will be closed and then replicated out .
疑问在于:
代码开头判断了block只能是UC或Committed状态, 然后通过commitBlock()
将UC态也转为Committed态. 如果转换失败抛异常了, 如果block能走到这个判断逻辑, 说明此时块已处于Committed
状态, 如何会出现lastBlock处于恢复态的情况呢?
处于Committed状态的块, 还能变成恢复状态(UR)么? (比如下面的情况, 它会变为UR态么)
一个块处于Committed状态后, 一直没有达到Complete态的要求, 客户端断开后, 那它会如何被处理?
abandonBlock()
以及其它错误恢复相关的RPC, 在追加写和错误恢复篇 再说, 下一篇是普通写流程的完结-Datanode篇
参考资料:
HDFS3.1.2官方源码
Hadoop 2.X HDFS源码剖析-Chapt3.4