上一篇 介绍了HDFS普通写在Namenode的实现逻辑, 最后来看看在Datanode (DN)的实现. 这块内容同样很多, 所以先抓核心要素, 有些细节可以先放一下
0x00. 前置 先来回顾一下, 从Client 到DN 端传输数据, 它本质通过socket
的方式传输IO流, 而在发数据之前, 是如何创建这个pipeline(传输管道) 的呢?
还记得之前画过的那辆小火车 吧, 火车头(Sender) 和到站点(Reciver) 大体框架如下所示:
graph LR
Client --发送--> Sender
Reciver --解析--> datanode
subgraph DataTransferProtocol
Sender
Reciver
end
io[pipeline]
Sender --> io
io --> Reciver
这里要注意一下, Sender和Reciver都是一个大的载体, 里面包含了读写DN的多个 关键的方法,它核心有:
readBlock
writeBlock
transferBlock
本地读相关操作
我们写数据调用的其实是Sender.writeBlock()
, 然后来看看writeBlock时封装了哪些信息
注: 发送的信息结构从低版到高版本HDFS做过几次拆分调整, 下图是低版本 整体结构: (高版本有不小区别. 暂没单独画)
重点关注一下第二行 的信息:
比如其中的OP
代表操作码. 常见的写(80), 读(81)
然后高版本加了一个重要的stage
参数, 标示当前pipeline处于什么状态, 就无需单独携带recoveryFlag
了
所以这个图仅供参考, 实际信息以Sender.writeBlock()
为准, 差别比较大. (大部分参数高版中都被整合 或换名了, 切不可照套)
0x01. 接收Client请求 之前提到客户端在创建pipeline的时候, 会发送一个火车头(Sender)给第一个DN, 调用它的writeBlock()
方法序列化一系列的DN信息, 然后再DN这边, 则通过启动之后就通过Receiver
的具体实现DataXceiver
的线程来接收信息了, 但是一个DN可能会有许多个Client同时发送读写 请求, 这里DN就提供了一个控制层来处理(如图所示):
每个DN在启动的时候, 都会初始化一个DataXceiverServer
(DXServer)线程来监听Socket的流式接口, 然后再转发给一个单独的DataXceiver
(DX)线程, 过程类似于你通过滴滴打车, 请求是先提交给平台 (DXServer), 然后平台再分派给指定的司机 (DX)执行, 所以书中原图也挺清晰, 下面看看它的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 @Override public void run () { Peer peer = null ; while (datanode.shouldRun && !datanode.shutdownForUpgrade) { try { peer = peerServer.accept(); if (datanode.getXceiverCount() > maxXceiverCount) throw new IOException("Xceiver count exceeds the limit" ); new Daemon(datanode.threadGroup, DataXceiver.create(peer, datanode, this )).start(); } catch (SocketTimeoutException ignored) { } catch (AsynchronousCloseException ace) { if (datanode.shouldRun && !datanode.shutdownForUpgrade) LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: " , ace); } catch (IOException ie) { IOUtils.cleanup(null , peer); } catch (OutOfMemoryError ie) { IOUtils.cleanup(null , peer); try { Thread.sleep(30 * 1000 ); } catch (InterruptedException e) {} } catch (Throwable te) { datanode.shouldRun = false ; } } try { peerServer.close(); closed = true ; } catch (IOException ie) {logWarn(ie);} closeAllPeers(); }
然后显然我们更关心的是Worker线程 —- DataXceiver
的核心逻辑, 简单说它就是在解析OP
参数跳转到对应的读/写/复制 副本块等逻辑.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @Override public void run () { int opsProcessed = 0 ; Op op = null ; try { synchronized (this ) {xceiver = Thread.currentThread();} dataXceiverServer.addPeer(peer, Thread.currentThread(), this ); peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout); InputStream input = socketIn; try { IOStreamPair saslStreams = datanode.saslServer.receive(params..); input = new BufferedInputStream(saslStreams.in, smallBufferSize); socketOut = saslStreams.out; } catch (InvalidMagicNumberException imne) {return ;} super .initialize(new DataInputStream(input)); do { try { if (opsProcessed != 0 ) { peer.setReadTimeout(dnConf.socketKeepaliveTimeout); } else { peer.setReadTimeout(dnConf.socketTimeout); } op = readOp(); } catch (InterruptedIOException ignored) {break ;} catch (EOFException | ClosedChannelException e) {break ;} catch (IOException err) { incrDatanodeNetworkErrors(); throw err; } if (opsProcessed != 0 ) peer.setReadTimeout(dnConf.socketTimeout); opStartTime = monotonicNow(); processOp(op); ++opsProcessed; } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0 ); } catch (Throwable t) { } finally { collectThreadLocalStates(); if (peer != null ) { dataXceiverServer.closePeer(peer); IOUtils.closeStream(in); }}}
所以关键就是看processOP()
调用实际由DataXceiver
实现的writeBlock()
方法处理客户端发送来的数据了.
0x02. 写数据整体 先来看看写入操作在DN端整体的时序图: (核心 )
sequenceDiagram
participant A as Client
participant B as Datanode1
participant C as Datanode2
participant D as Datanode3
%%participant E as DataNode
A->>B: Sender发送写块请求
B->>C: 建立连接 & 转发
C->>D: 建立连接 & 转发
D-->>C: 返回ACK
C-->>B: 返回ACK
B-->>A: 返回ACK
%%rect rgb(182, 237, 227)
rect rgb(208, 222, 247)
%%rect rgb(208, 189, 230)
alt Success
A->>B: 发送Packet1
end
B->>C: 处理Packet1 & 转发
A->>B: 发送Packet2~n
C->>D: 处理Packet1 & 转发
Note right of D: DN3进行校验
Note left of A: 异步的发送packet
D-->>C: 返回p1-ack
C-->>B: 返回p1-ack
B->>C: 处理Packet2~n & 转发
B-->>A: 返回p1-ack
C->>D: 处理Packet2~n & 转发
end
alt 数据包发完
A->>B: 发送空尾包
end
B->>C: 处理空尾包 & 转发
C->>D: 处理空尾包 & 转发
Note right of D: 确认收尾
D-->>C: 返回ACK
C-->>B: 返回ACK
B-->>A: 返回ACK
Note left of A: 文件写入完成
然后对于每个DN来说, 都需要维护两组 输入/输出流 (最后一个DN除外, 它无下游):
stateDiagram
state IO数据流向 {
上游节点 --> 当前Datanode: in
当前Datanode --> 下游节点: mirrorOut
下游节点 --> 当前Datanode: mirrorIn
当前Datanode --> 上游节点: out
}
这里为了方便记忆, 把replyOut
简写为out与in对应, 这是一对I/O流, 然后另一对就是用于转发给下一个DN的mirror I/O流
(镜像)
最后再来看上图中”处理Packet和转发 “的具体实现, 方法比较长, 也按”总-分 “的模式来看, (日志和目前无关的信息略去)
初始化一对IO流, 以及核心的BlockReceiver
对象
连接到下游的Datanode节点, 初始化mirror_IO流
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 @Override public void writeBlock (params) { previousOpClientName = clientname; final boolean isDatanode = clientname.length() == 0 ; final boolean isClient = !isDatanode; final boolean isTransfer = stage == TRANSFER_RBW || stage == TRANSFER_FINALIZED; long size = 0 ; final DataOutputStream replyOut = getBufferedOutputStream(); int nst = targetStorageTypes.length; StorageType[] storageTypes = new StorageType[nst + 1 ]; storageTypes[0 ] = storageType; if (targetStorageTypes.length > 0 ) arraycopy(targetStorageTypes, 0 , storageTypes, 1 , nst); final int nsi = targetStorageIds.length; final String[] storageIds; if (nsi > 0 ) { storageIds = new String[nsi + 1 ]; storageIds[0 ] = storageId; if (targetStorageTypes.length > 0 ) System.arraycopy(targetStorageIds, 0 , storageIds, 1 , nsi); } else { storageIds = new String[0 ]; } final ExtendedBlock originalBlock = new ExtendedBlock(block); DataOutputStream mirrorOut = null ; DataInputStream mirrorIn = null ; Socket mirrorSock = null ; String mirrorNode = null ; String firstBadLink = "" ; Status mirrorInStatus = SUCCESS; final String storageUuid; final boolean isOnTransientStorage; try { final Replica replica; if (isDatanode || stage != PIPELINE_CLOSE_RECOVERY) { setCurrentBlockReceiver(new BlockReceiver(params...)); replica = blockReceiver.getReplica(); } else { replica = datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd); } storageUuid = replica.getStorageUuid(); isOnTransientStorage = replica.isOnTransientStorage(); if (targets.length > 0 ) { InetSocketAddress mirrorTarget; mirrorNode = targets[0 ].getXferAddr(connectToDnViaHostname); mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorSock = datanode.newSocket(); try { DataNodeFaultInjector.get().failMirrorConnection(); int timeoutValue = dnConf.socketTimeout +(READ_TIMEOUT_EXTENSION * targets.length); int writeTimeout = dnConf.socketWriteTimeout +(WRITE_TIMEOUT_EXTENSION * targets.length); NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue); mirrorSock.setTcpNoDelay(dnConf.getDataTransferServerTcpNoDelay()); mirrorSock.setSoTimeout(timeoutValue); mirrorSock.setKeepAlive(true ); if (dnConf.getTransferSocketSendBufferSize() > 0 ) mirrorSock.setSendBufferSize(dnConf.getTransferSocketSendBufferSize()); OutputStream unbufMirrorOut = NetUtils.getOutputStream(mirrorSock, writeTimeout); InputStream unbufMirrorIn = NetUtils.getInputStream(mirrorSock); DataEncryptionKeyFactory keyFactory = datanode.getDataEncryptionKeyFactoryForBlock(block); IOStreamPair saslStreams = datanode.saslClient.socketSend(params.., targets[0 ]); unbufMirrorOut = saslStreams.out; unbufMirrorIn = saslStreams.in; mirrorOut = new DataOutputStream(new BufferedOutputStream(unbufMirrorOut,smallBufferSize)); mirrorIn = new DataInputStream(unbufMirrorIn); String targetStorageId = null ; if (targetStorageIds.length > 0 ) targetStorageId = targetStorageIds[0 ]; if (targetPinnings != null && targetPinnings.length > 0 ) { new Sender(mirrorOut).writeBlock(targetPinnings[0 ], targetParams..); } else { new Sender(mirrorOut).writeBlock(false , targetParams..); } mirrorOut.flush(); if (isClient) { BlockOpResponseProto connectAck = parseFrom(mirrorIn); mirrorInStatus = connectAck.getStatus(); firstBadLink = connectAck.getFirstBadLink(); } } catch (IOException e) { if (isClient) { Text.writeString(replyOut, targets[0 ].getXferAddr()); replyOut.flush(); } closeAndSetNull(mirrorOut, mirrorIn, mirrorSock) if (isClient) { LOG.error("Exception transfering block xx to mirror xx" ); throw e; } else { incrDatanodeNetworkErrors(); }}} if (isClient && !isTransfer) { BlockOpResponseProto.newBuilder().setStatus(mirrorInStatus) .setFirstBadLink(firstBadLink) .build().writeDelimitedTo(replyOut); } replyOut.flush(); } if (blockReceiver != null ) { String mirrorAddr = (mirrorSock == null ) ? null : mirrorNode; blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut, targets, params..); if (isTransfer) writeResponse(SUCCESS, null , replyOut); } if (isClient && stage == PIPELINE_CLOSE_RECOVERY) { block.setGenerationStamp(latestGenerationStamp); block.setNumBytes(minBytesRcvd); } if (isDatanode || stage == PIPELINE_CLOSE_RECOVERY) datanode.closeBlock(block, null , storageUuid, isOnTransientStorage); if (isClient) size = block.getNumBytes(); } catch (IOException ioe) { incrDatanodeNetworkErrors(); throw ioe; } finally { closeAllStreamAndSocket(params) setCurrentBlockReceiver(null ); }}
在writeBlock的流程里, 有个关键的地方是初始化BlockReceiver
, 它在客户端第一次新写block时, 也就是pipeline处于SETUP_CREATE
状态时, 它会执行创建RBW副本的操作. 主要做的是在rbw
文件夹中创建一个新的副本, 初始状态RBW (副本的状态转变比较复杂, 详见此图 , 在租约恢复篇 再细说)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Override public ReplicaHandler createRbw (params...) { try (AutoCloseableLock lock = datasetLock.acquire()) { ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(), b.getBlockId()); if (replicaInfo != null ) { throw new ReplicaAlreadyExistsException( "Block x already exists in state x and can't be created." ); } FsVolumeReference ref = null ; if (allowLazyPersist && lazyWriter != null && b.getNumBytes() % cacheManager.getOsPageSize() == 0 && reserveLockedMemory(b.getNumBytes())) { try { ref = volumes.getNextTransientVolume(b.getNumBytes()); datanode.getMetrics().incrRamDiskBlocksWrite(); } catch (DiskOutOfSpaceException de) { LOG.warn("transient volume is not enough, fall back to persistent storage" ); } finally { if (ref == null ) cacheManager.release(b.getNumBytes()); }} if (ref == null ) ref = volumes.getNextVolume(params...); FsVolumeImpl v = (FsVolumeImpl) ref.getVolume(); ReplicaInPipeline newReplicaInfo; try { newReplicaInfo = v.createRbw(b); if (newReplicaInfo.getReplicaInfo().getState() != ReplicaState.RBW) throw new IOException("Replica was not RBW. xxx" ); } catch (IOException e) { IOUtils.cleanup(null , ref); throw e; } volumeMap.add(b.getBlockPoolId(), newReplicaInfo.getReplicaInfo()); return new ReplicaHandler(newReplicaInfo, ref); } }
0x03. 写入细节 上面整体的步骤过完了, 下面接着看正常写DN时BlockReceiver类的receiveBlock()
方法, 同样先来看看它内部的整体流程: (参考书中图)
它和我们在Client端看到的DataStreamer
和ResponseProcessor
是相似的结构, 这里也有一个关键的ackQueue
(数据包队列),所以理解了客户端的生产-消费模型, DN这里就容易理解了, 需要注意的是最后一个DN 以及携带Sync
参数的时候, packet入队时间有所调整, 接着看看代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 void receiveBlock (mirrOut, mirrIn, replyOut,DNInfo[] downstreams, params..) { syncOnClose = datanode.getDnConf().syncOnClose; dirSyncOnFinalize = syncOnClose; boolean responderClosed = false ; mirrorOut = mirrOut; mirrorAddr = mirrAddr; initPerfMonitoring(downstreams); throttler = throttlerArg; this .replyOut = replyOut; this .isReplaceBlock = isReplaceBlock; try { if (isClient && !isTransfer) { responder = new Daemon(datanode.threadGroup, new PacketResponder(replyOut, mirrIn, downstreams)); responder.start(); } while (receivePacket() >= 0 ) { } if (responder != null ) { ((PacketResponder)responder.getRunnable()).close(); responderClosed = true ; } if (isDatanode || isTransfer) { try (ReplicaHandler handler = claimReplicaHandler()) { close(); block.setNumBytes(replicaInfo.getNumBytes()); if (stage == TRANSFER_RBW) datanode.data.convertTemporaryToRbw(block); else datanode.data.finalizeBlock(block, dirSyncOnFinalize); } } } catch (IOException ioe) { replicaInfo.releaseAllBytesReserved(); if (datanode.isRestarting()) { LOG.info("Shutting down for restart (" + block + ")." ); } else { LOG.info("Exception for " + block, ioe); throw ioe; } } finally { Thread.interrupted(); if (!responderClosed) { if (responder != null ) { if (datanode.isRestarting() && isClient && !isTransfer) { try (Writer out = new OutputStreamWriter(replicaInfo.createRestartMetaStream())) { out.write(Long.toString(Time.now() + restartBudget)); out.flush(); } catch (IOException ioe) { } finally { IOUtils.closeStream(streams.getDataOut()); } try { Thread.sleep(1000 ); } catch (InterruptedException ie) {} } responder.interrupt(); } IOUtils.closeStream(this ); cleanupBlock(); } if (responder != null ) { try { responder.interrupt(); long joinTimeout = datanode.getDnConf().getXceiverStopTimeout(); joinTimeout = joinTimeout > 1 ? joinTimeout*8 /10 : joinTimeout; responder.join(joinTimeout); if (responder.isAlive()) throw new IOException(..); } catch (InterruptedException e) { responder.interrupt(); if (!datanode.isRestarting()) throw new IOException("Interrupted receiveBlock" ); } responder = null ; }}}
可看到关键的处理工作都被交给了receivePacket()
, 其他无关内容可暂略过, 后续再看
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 private int receivePacket () throws IOException { packetReceiver.receiveNextPacket(in); PacketHeader header = packetReceiver.getHeader(); checkHeader(header); long offsetInBlock = header.getOffsetInBlock(); long seqno = header.getSeqno(); boolean lastPacketInBlock = header.isLastPacketInBlock(); final int len = header.getDataLen(); boolean syncBlock = header.getSyncBlock(); if (syncBlock && lastPacketInBlock) { this .syncOnClose = false ; this .dirSyncOnFinalize = true ; } final long firstByteInBlock = offsetInBlock; offsetInBlock += len; if (replicaInfo.getNumBytes() < offsetInBlock) replicaInfo.setNumBytes(offsetInBlock); if (responder != null && !syncBlock && !shouldVerifyChecksum()) ((PacketResponder) responder.getRunnable()).enqueue(new Packet(seqno, params..)); if (mirrorOut != null && !mirrorError) { try { long begin = Time.monotonicNow(); packetReceiver.mirrorPacketTo(mirrorOut); mirrorOut.flush(); long now = Time.monotonicNow(); setLastSentTime(now); long duration = now - begin; trackSendPacketToLastNodeInPipeline(duration); if (duration > datanodeSlowLogThresholdMs) { LOG.warn("Slow BlockReceiver write packet to mirror took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + "downstream DNs=" + Arrays.toString(downstreamDNs)+ ", blockId=" + replicaInfo.getBlockId()); } } catch (IOException e) { handleMirrorOutError(e); } } ByteBuffer dataBuf = packetReceiver.getDataSlice(); ByteBuffer checksumBuf = packetReceiver.getChecksumSlice(); if (lastPacketInBlock || len == 0 ) { if (syncBlock) flushOrSync(true ); } else { final int checksumLen = diskChecksum.getChecksumSize(len); final int checksumReceivedLen = checksumBuf.capacity(); if (checksumReceivedLen > 0 && checksumReceivedLen != checksumLen) throw new IOException("received length is xx, but expected length is xx" ); if (checksumReceivedLen > 0 && shouldVerifyChecksum()) { try { verifyChunks(dataBuf, checksumBuf); } catch (IOException ioe) { if (responder != null ) { try { ((PacketResponder) responder.getRunnable()).enqueue(params.., ERROR_CHECKSUM); Thread.sleep(3000 ); } catch (InterruptedException e) {..} } throw new IOException("Terminating due to a checksum error." + ioe); } if (needsChecksumTranslation) translateChunks(dataBuf, checksumBuf); } if (checksumReceivedLen == 0 && !streams.isTransientStorage()) { checksumBuf = ByteBuffer.allocate(checksumLen); diskChecksum.calculateChunkedSums(dataBuf, checksumBuf); } final boolean shouldNotWriteChecksum = (checksumReceivedLen == 0 && streams.isTransientStorage()); try { long onDiskLen = replicaInfo.getBytesOnDisk(); if (onDiskLen<offsetInBlock) { long partialChunkSizeOnDisk = onDiskLen % bytesPerChecksum; long lastChunkBoundary = onDiskLen - partialChunkSizeOnDisk; boolean alignedOnDisk = partialChunkSizeOnDisk == 0 ; boolean alignedInPacket = firstByteInBlock % bytesPerChecksum == 0 ; boolean overwriteLastCrc = !alignedOnDisk && !shouldNotWriteChecksum; boolean doCrcRecalc = overwriteLastCrc && (lastChunkBoundary != firstByteInBlock); if (!alignedInPacket && len > bytesPerChecksum) throw new IOException("Unexpected packet data length for block xx" ); Checksum partialCrc = null ; if (doCrcRecalc) { long offsetInChecksum = BlockMetadataHeader.getHeaderSize() + onDiskLen / bytesPerChecksum * checksumSize; partialCrc = computePartialChunkCrc(onDiskLen, offsetInChecksum); } int startByteToDisk = (int )(onDiskLen-firstByteInBlock) + dataBuf.arrayOffset() + dataBuf.position(); int numBytesToDisk = (int )(offsetInBlock-onDiskLen); final byte [] lastCrc; if (shouldNotWriteChecksum) { lastCrc = null ; } else { int skip = 0 ; byte [] crcBytes = null ; if (overwriteLastCrc) adjustCrcFilePosition(); if (doCrcRecalc) { int bytesToReadForRecalc = (int )(bytesPerChecksum - partialChunkSizeOnDisk); if (numBytesToDisk < bytesToReadForRecalc) bytesToReadForRecalc = numBytesToDisk; partialCrc.update(dataBuf.array(), startByteToDisk, bytesToReadForRecalc); byte [] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize); crcBytes = copyLastChunkChecksum(buf, checksumSize, buf.length); checksumOut.write(buf); skip++; } long skippedDataBytes = lastChunkBoundary - firstByteInBlock; if (skippedDataBytes > 0 ) { skip += (int )(skippedDataBytes / bytesPerChecksum) + ((skippedDataBytes % bytesPerChecksum == 0 ) ? 0 : 1 ); } skip *= checksumSize; final int offset = checksumBuf.arrayOffset() + checksumBuf.position() + skip; final int end = offset + checksumLen - skip; if (offset >= end && doCrcRecalc) { lastCrc = crcBytes; } else { final int remainingBytes = checksumLen - skip; lastCrc = copyLastChunkChecksum(checksumBuf.array(), checksumSize, end); checksumOut.write(checksumBuf.array(), offset, remainingBytes); } } long begin = Time.monotonicNow(); streams.writeDataToDisk(dataBuf.array(), startByteToDisk, numBytesToDisk); long duration = Time.monotonicNow() - begin; if (duration > datanodeSlowLogThresholdMs) LOG.warn("Slow BlockReceiver write data to disk cost: ms" + duration); if (duration > maxWriteToDiskMs) maxWriteToDiskMs = duration; flushOrSync(syncBlock); replicaInfo.setLastChecksumAndDataLen(offsetInBlock, lastCrc); manageWriterOsCache(offsetInBlock); } } catch (IOException iex) { throw iex; } } if (responder != null && (syncBlock || shouldVerifyChecksum())) { ((PacketResponder) responder.getRunnable()).enqueue(params...); } if (isReplaceBlock && (Time.monotonicNow() - lastResponseTime > responseInterval)) { BlockOpResponseProto.Builder response = BlockOpResponseProto.newBuilder(). setStatus(Status.IN_PROGRESS); response.build().writeDelimitedTo(replyOut); replyOut.flush(); lastResponseTime = Time.monotonicNow(); } if (throttler != null ) throttler.throttle(len); return lastPacketInBlock ? -1 : len; }
0x04. 写块到磁盘 上面receivePacket()
的最后, 会把数据+校验和写到DN的本地磁盘, 写完成后, 应该是如下的结构:
写盘这里有许多OS层的相关调用, 细节比较复杂. 待补充….
0x05. PacketResponder线程 最后来看看PacketResponder线程, 它比Client的response线程做的事要多一些, 不过整体也比较直观, 如图所示:
stateDiagram
[*] --> 不是最后的空尾包
不是最后的空尾包 --> 非最后一个DN
非最后一个DN --> 读取下游DN的ack返回
读取下游DN的ack返回 --> 从ackQueue中取出packet
从ackQueue中取出packet --> 判断下游返回是否匹配
判断下游返回是否匹配 --> 若接受到空尾包_则完成当前block
不是最后的空尾包 --> 最后一个DN
最后一个DN --> 从ack队中取出packet
从ack队中取出packet --> 若接受到空尾包_则完成当前block
若接受到空尾包_则完成当前block --> 向上游节点发送响应ACK
向上游节点发送响应ACK --> 从ackQueue中移除当前packet
从ackQueue中移除当前packet --> 不是最后的空尾包: 循环判断
从ackQueue中移除当前packet --> [*]: 处理最后的空尾包
然后它有3个 状态, 初始化线程时根据targetDN
数决定
NON_PIPELINE (下游节点为null?)
LAST_IN_PIPELINE (最后一个DN)
HAS_DOWNSTREAM_IN_PIPELINE (非最后一个DN)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 @Override public void run () { boolean lastPacketInBlock = false ; final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0 ; while (isRunning() && !lastPacketInBlock) { long totalAckTimeNanos = 0 ; boolean isInterrupted = false ; try { Packet pkt = null ; long expected = -2 ; PipelineAck ack = new PipelineAck(); long seqno = PipelineAck.UNKOWN_SEQNO; long ackRecvNanoTime = 0 ; try { if (type != PacketResponderType.LAST_IN_PIPELINE && !mirrorError) { ack.readFields(downstreamIn); ackRecvNanoTime = System.nanoTime(); seqno = ack.getSeqno(); } if (seqno != PipelineAck.UNKOWN_SEQNO || type == LAST_IN_PIPELINE) { pkt = waitForAckHead(seqno); if (!isRunning()) break ; expected = pkt.seqno; if (type == HAS_DOWNSTREAM_IN_PIPELINE && seqno != expected) throw new IOException("seqno: expected=" + expected + ", received=" + seqno); if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) { totalAckTimeNanos = ackRecvNanoTime - pkt.ackEnqueueNanoTime; long ackTimeNanos = totalAckTimeNanos - ack.getDownstreamAckTimeNanos(); } lastPacketInBlock = pkt.lastPacketInBlock; } } catch (InterruptedException ine) { isInterrupted = true ; } catch (IOException ioe) { if (Thread.interrupted()) { isInterrupted = true ; } else if (ioe instanceof EOFException && !packetSentInTime()) { LOG.warn("The downstream error might be due to congestion in " + "upstream including this node. Propagating the error: xx" ); throw ioe; } else { mirrorError = true ; LOG.info(myString, ioe); } } if (Thread.interrupted() || isInterrupted) { LOG.info(myString + ": Thread is interrupted." ); running = false ; continue ; } if (lastPacketInBlock) finalizeBlock(startTime); Status myStatus = pkt != null ? pkt.ackStatus : Status.SUCCESS; sendAckUpstream(ack, expected, totalAckTimeNanos, (pkt != null ? pkt.offsetInBlock : 0 ), PipelineAck.combineHeader(datanode.getECN(), myStatus)); if (pkt != null ) removeAckHead(); } catch (IOException e) { LOG.warn("IOException in BlockReceiver.run(): " , e); if (running) { running = false ; if (!Thread.interrupted()) receiverThread.interrupt(); } } catch (Throwable e) { if (running) { running = false ; receiverThread.interrupt(); } }}}
其中finalizedBlock()
的主要执行过程是在BPOfferService中调用增量块汇报实现的, 它由一个DN的核心守护线程负责, 在这里就不详说了.
至此, 整个写入流程在Datanode端的整体步骤就讲完了, 其中遗留了不少特殊情况的注释, 之后会一一再来细看.
未完待续…..
0x06. 遗留问题 主要是两个问题: (代码中已标记部分)
写DN中response 线程出现异常, 和它是否中断的考量 (并发体系)
DN写数据到磁盘的过程的问题:
写失败会马上感知么?
写的很慢会怎么样? 卡住, response线程还会转发ACK么, 如果转发, 那当前DN一直写卡住么?
慢盘/慢DN问题是个重要的点, 之后单独来说….
参考资料:
HDFS3.1.2官方源码
Hadoop 2.X HDFS源码剖析Chapter4.x图
HDFS1.X官方源码(对比高版用)