@Override publicsynchronizedintread(){ if (oneByteBuf == null) oneByteBuf = newbyte[1]; //init 1 byte int ret = read(oneByteBuf, 0, 1); return (ret <= 0) ? -1 : (oneByteBuf[0] & 0xff); // keep binary same }
@Override publicsynchronizedintread(@Nonnull finalbyte buf[], int off, int len){ validatePositionedReadArgs(pos, buf, off, len); if (len == 0) return0; // 将不同的BlockReader进行了封装, 真正读数据的是BlockReader ReaderStrategy byteArrayReader; byteArrayReader = new ByteArrayStrategy(buf, off, len, readStatistics, dfsClient); return readWithStrategy(byteArrayReader); }
protectedfinal Object infoLock = new Object(); // why use null Object as lockObj? // 关键方法,省略部分检查 protectedsynchronizedintreadWithStrategy(ReaderStrategy strategy){ int len = strategy.getTargetLength(); CorruptedBlocks corruptedBlocks = new CorruptedBlocks(); failures = 0; // int type if (pos < getFileLength()) { int retries = 2; // 外层重试两次 while (retries > 0) { try { // currentNode can be left as null if previous read had a checksum // error on the same block. See HDFS-3067 if (pos > blockEnd || currentNode == null) { currentNode = blockSeekTo(pos); } int realLen = (int) Math.min(len, (blockEnd - pos + 1L)); // 这锁目的主要是单纯建立一个同步代码块,而不限定当前对象持有(this) synchronized(infoLock) { if (locatedBlocks.isLastBlockComplete()) realLen = (int) Math.min(realLen, locatedBlocks.getFileLength() - pos); } int result = readBuffer(strategy, realLen, corruptedBlocks);
if (result >= 0) { pos += result; } else { // got a EOS from reader though we expect more data on it. thrownew IOException("Unexpected EOS from the reader"); } updateReadStatistics(readStatistics, result, blockReader); dfsClient.updateFileSystemReadStats(blockReader.getNetworkDistance(), result); return result; } catch (ChecksumException ce) { // 校验失败(文件不完整, 直接抛出异常, 不扣次数) throw ce; } catch (IOException e) { checkInterrupted(e); if (retries == 1) DFSClient.LOG.warn("DFS Read", e); blockEnd = -1; if (currentNode != null) addToDeadNodes(currentNode); if (--retries == 0) throw e; } finally { /* 判断是否需要通过ClientProtocol汇报块损坏 (不管最终读成功/校验失败) * 1. client读了多次,至少有1个成功,汇报已尝试的坏块 * 2. 所有副本读失败,只汇报副本设置1的, 其他不会汇报 * 因为官方考虑这种情况可能是client自己有问题(导致的读全失败) * Q: 全读失败的时候,不汇报有怎么进一步确定是谁的问题呢? */ reportCheckSumFailure(corruptedBlocks, getCurrentBlockLocationsLength(), false); } } } return -1; }
privatesynchronizedintreadBuffer(ReaderStrategy reader, int len, CorruptedBlocks corruptedBlocks){ IOException ioe;
/* 当前节点只尝试一次 * Intention is to handle one common case of an error that is not a * failure on datanode or client : when DataNode closes the connection * since client is idle. If there are other cases of "non-errors" then * then a datanode might be retried by setting this to true again. */ boolean retryCurrentNode = true;
while (true) { // retry as many times as seekToNewSource allows. try { return reader.readFromBlock(blockReader, len); } catch (ChecksumException ce) { DFSClient.LOG.warn("Found Checksum error for " + getCurrentBlock() + " from " + currentNode + " at " + ce.getPos()); ioe = ce; retryCurrentNode = false; // we want to remember which block replicas we have tried corruptedBlocks.addCorruptedBlock(getCurrentBlock(), currentNode); } catch (IOException e) { if (!retryCurrentNode) { DFSClient.LOG.warn("Exception while reading from " + getCurrentBlock() + " of " + src + " from " + currentNode, e); } ioe = e; } boolean sourceFound; if (retryCurrentNode) { /* possibly retry the same node so that transient errors don't * result in application level failures (e.g. Datanode could have * closed the connection because the client is idle for too long). */ sourceFound = seekToBlockSource(pos); } else { addToDeadNodes(currentNode); sourceFound = seekToNewSource(pos); } if (!sourceFound) { throw ioe; } retryCurrentNode = false; } }
PacketHeader curHeader = packetReceiver.getHeader(); curDataSlice = packetReceiver.getDataSlice(); assert curDataSlice.capacity() == curHeader.getDataLen(); // Sanity check the lengths if (!curHeader.sanityCheck(lastSeqNo)) ("error in packet header "+curHeader);
if (curHeader.getDataLen() > 0) { int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum; int checksumsLen = chunks * checksumSize;
assert packetReceiver.getChecksumSlice().capacity() == checksumsLen; lastSeqNo = curHeader.getSeqno(); if (verifyChecksum && curDataSlice.remaining() > 0) { // N.B.: the checksum error offset reported here is actually // relative to the start of the block, not the start of the file. // This is slightly misleading, but preserves the behavior from // the older BlockReader. // checkSum校验 (CRC32) checksum.verifyChunkedSums(curDataSlice, packetReceiver.getChecksumSlice(), filename, curHeader.getOffsetInBlock()); } bytesNeededToFinish -= curHeader.getDataLen(); }
// First packet will include some data prior to the first byte // the user requested. Skip it. if (curHeader.getOffsetInBlock() < startOffset) { int newPos = (int) (startOffset - curHeader.getOffsetInBlock()); curDataSlice.position(newPos); }
// If we've now satisfied the whole client read, read one last packet header, which should be empty if (bytesNeededToFinish <= 0) { // 读尾部的空包? readTrailingEmptyPacket(); // 读完成后, 给DN发响应码 // 否则DN可能会主动关闭连接,尽管这不影响数据完整性 if (verifyChecksum) { sendReadResult(Status.CHECKSUM_OK); } else { sendReadResult(Status.SUCCESS); } } }
if (payloadLen < Ints.BYTES) { // The "payload length" includes its own length. Therefore it // should never be less than 4 bytes thrownew IOException("Invalid payload length " + payloadLen); } int dataPlusChecksumLen = payloadLen - Ints.BYTES; int headerLen = curPacketBuf.getShort(); if (headerLen < 0) throwExcept("Invalid header length " + headerLen);
// Sanity check the buffer size so we don't allocate too much memory and OOME. int totalLen = payloadLen + headerLen; if (totalLen < 0 || totalLen > MAX_PACKET_SIZE) throwExcept("Incorrect value for packet payload size: " +payloadLen);
// Make sure we have space for the whole packet, and read it. reallocPacketBuf(PacketHeader.PKT_LENGTHS_LEN + dataPlusChecksumLen + headerLen); curPacketBuf.clear(); curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN); curPacketBuf.limit(PacketHeader.PKT_LENGTHS_LEN + dataPlusChecksumLen + headerLen); doReadFully(ch, in, curPacketBuf); curPacketBuf.flip(); curPacketBuf.position(PacketHeader.PKT_LENGTHS_LEN);
// Extract the header from the front of the buffer (after the length prefixes) byte[] headerBuf = newbyte[headerLen]; curPacketBuf.get(headerBuf); // 初始化Packet头 if (curHeader == null) curHeader = new PacketHeader(); curHeader.setFieldsFromData(payloadLen, headerBuf);
// 计算 sub-slices of the packet int checksumLen = dataPlusChecksumLen - curHeader.getDataLen(); throwsExcepIfCheckSumLenLessThanZero(checksumLen);
// 全读 privatestaticvoiddoReadFully(ReadableByteChannel channel, InputStream in, ByteBuffer buf){ if (channel != null) { while (buf.remaining() > 0) { int n = channel.read(buf); if (n < 0) throwExcept("Premature EOF reading from " + channel); }
} else { int toRead = buf.remaining(); int off = buf.arrayOffset() + buf.position(); while (toRead > 0) { int ret = in.read(buf.array(), off, toRead); if (ret < 0) throwExcept( "Premature EOF from inputStream"); toRead = toRead - ret; off = off + ret; } buf.position(buf.position() + buf.remaining()); } }
privatevoidreslicePacket( int headerLen, int checksumsLen, int dataLen){ // Packet structure (refer to doRead() for details): // PLEN HLEN HEADER CHECKSUMS DATA // 32-bit 16-bit <protobuf> <variable length> // |--- lenThroughHeader ----| // |----------- lenThroughChecksums ----| // |------------------- lenThroughData ------| int lenThroughHeader = PacketHeader.PKT_LENGTHS_LEN + headerLen; int lenThroughChecksums = lenThroughHeader + checksumsLen; int lenThroughData = lenThroughChecksums + dataLen;