privatesynchronizedintreadBuffer(ReaderStrategy reader, int len, CorruptedBlocks corruptedBlocks) { IOException ioe;
/* 当前节点只尝试一次 * Intention is to handle one common case of an error that is not a * failure on datanode or client : when DataNode closes the connection * since client is idle. If there are other cases of "non-errors" then * then a datanode might be retried by setting this to true again. */ booleanretryCurrentNode=true;
while (true) { // retry as many times as seekToNewSource allows. try { return reader.readFromBlock(blockReader, len); } catch (ChecksumException ce) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode); } catch (IOException e) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " + getCurrentBlock() + " of " + src + " from " + currentNode, e); } ioe = e; } boolean sourceFound; if (retryCurrentNode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. Datanode could have * closed the connection because the client is idle for too long). */ sourceFound = seekToBlockSource(pos); } else { addToDeadNodes(currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { throw ioe; } retryCurrentNode = false; } }
assert packetReceiver.getChecksumSlice().capacity() == checksumsLen; lastSeqNo = curHeader.getSeqno(); if (verifyChecksum && curDataSlice.remaining() > 0) { // N.B.: the checksum error offset reported here is actually // relative to the start of the block, not the start of the file. // This is slightly misleading, but preserves the behavior from // the older BlockReader. // checkSum校验 (CRC32) checksum.verifyChunkedSums(curDataSlice, packetReceiver.getChecksumSlice(), filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); }
// First packet will include some data prior to the first byte // the user requested. Skip it. if (curHeader.getOffsetInBlock() < startOffset) { intnewPos= (int) (startOffset - curHeader.getOffsetInBlock()); curDataSlice.position(newPos); }
// If we've now satisfied the whole client read, read one last packet header, which should be empty if (bytesNeededToFinish <= 0) { // 读尾部的空包? readTrailingEmptyPacket(); // 读完成后, 给DN发响应码 // 否则DN可能会主动关闭连接,尽管这不影响数据完整性 if (verifyChecksum) { sendReadResult(Status.CHECKSUM_OK); } else { sendReadResult(Status.SUCCESS); } } }
if (payloadLen < Ints.BYTES) { // The "payload length" includes its own length. Therefore it // should never be less than 4 bytes thrownewIOException("Invalid payload length " + payloadLen); } intdataPlusChecksumLen= payloadLen - Ints.BYTES; intheaderLen= curPacketBuf.getShort(); if (headerLen < 0) throwExcept("Invalid header length " + headerLen);
// Sanity check the buffer size so we don't allocate too much memory and OOME. inttotalLen= payloadLen + headerLen; if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) throwExcept("Incorrect value for packet payload size: " +payloadLen);
// Make sure we have space for the whole packet, and read it. reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + dataPlusChecksumLen + headerLen); curPacketBuf.clear(); curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + dataPlusChecksumLen + headerLen); doReadFully(ch, in, curPacketBuf); curPacketBuf.flip(); curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
// Extract the header from the front of the buffer (after the length prefixes) byte[] headerBuf = newbyte[headerLen]; curPacketBuf.get(headerBuf); // 初始化Packet头 if (curHeader == null) curHeader = newPacketHeader(); curHeader.setFieldsFromData(payloadLen, headerBuf);
// 计算 sub-slices of the packet intchecksumLen= dataPlusChecksumLen - curHeader.getDataLen(); throwsExcepIfCheckSumLenLessThanZero(checksumLen);