HDFS3-DeadNodes状态机设计
HDFS-Client在读文件的时候, 有个关键的异常集合—-DeadNodes
, 它存储client认为已经挂掉的DN节点, 避免重复尝试
但是它之前的状态存储做的很简单, 且在stream的生命周期内是不会清除的, 它对Hbase这种打开一个文件就不关闭流的使用方式来说是影响很大的, 今天来看看社区在HDFS3.3引入的DeadNodes状态检测机制, 如何优雅的解决这个问题
0x00. 前言
在说Deadnode detector方案设计之前, 先具体一点说说为什么需要这个功能, 开头提到了, 它主要影响的是Hbase为代表的, 打开一个文件后长时间不关闭的使用方式, 主要体现在这两个问题上:
- 同一个Client发起的不同打开文件的流, 没有共享DeadNodes, 期望某个DN挂掉后, 应该能在整个
DFSClient
的生命周期内共享, 避免重复的尝试降低读性能 - 有些DN可能是临时网络不通, 或者临时下线, 很快就恢复了, 但是DeadNodes缺乏合理的更新机制, 就可能导致严重的问题:
- 功能: 以EC模式为例, 因为EC文件仅有1副本, 如果DeadNodes一直不清理, 随着时间推移Client会发现缺失块超过阈值, 直接读失败. (只能重启RS解决)
- 性能: 以异构模式为例, 三副本分别是1SSD+2HDD的组合, SSD所在的DN被加入DeadNodes后长期不更新, 导致读请求全部降级到HDD. 性能锐减10倍以上
关于第二个问题, 顺便补充一下自己写的两个简版思路, 方便后续对比思考: (我当时想尽量不引入新的变动)
- (方案一) 当时遇到了DeadNodes不清理的问题, 我最简单的想法是, 就给DeadNodes集合设置一个阈值, 比如EC(6+3)最多容忍3个DN挂掉, 我就直接在选DN前判断一下, 如果
deadNodes.size() >= 3
, 我直接移除第一个(最早)被加入的DeadNode, 这本质是通过DeadNodes数目来做操作, 改动最少 - (方案二) 然后比较容易的想到的另一个方案是, 加一个定时器, 达到某个时间阈值做移除/清理操作, 本质是通过时间来操作, 改动也少
可以看到我的两个方案, 基本是简单加了个有条件的自动清理, 并没有很完善的设计.
那再来说一下社区的这个DeadNodes状态感知方案, 整个功能基本是小米自己内部实现后, 回馈给社区的, 分为主要几个patch:
从而实现了上述说的两个主要问题, 并且比较系统和完善. 下面大致也沿着这个顺序来阅读
0x01. 框架
首先, 有个大前提变动是, 原本DeadNodes的生命周期是在inputStream级别维护, 在新的设计中, 把这个生命周期放到了DFSClient(FS)
级别, 被每个输入流共享, 核心是引入了两个状态机设计:
- Datanode的状态 (3种)
- 探测器的状态 (4种)
基于这个大前提, 然后参考作者文档图, 先给个简洁一点的Datanode状态转变流程: (省略关闭stream时的移除)
stateDiagram [*] --> Live: open file Live --> Suspect: Read(×) Dead --> Live: RPC(√) %%Dead --> Clear: stream close Suspect: 疑似故障 Suspect --> Dead: RPC(×) Suspect --> Live: RPC(√) %%Suspect --> Clear: stream close %%note right of Dead: 关闭流时清除
上图简单说相比原始版本的DeadNodes, 增加了两个功能:
- 多了一级缓冲(suspectNodes), 正常DN与Client发生任何异常都不会被直接放到DeadNodes中, 而是先认为疑似故障, 等待发送RPC后仍失败才转为Dead.
- 通过定时给DeadNodes发送RPC, 从而及时把恢复正常的DN从DeadNodes中释放.
然后再看看DeadNods-Detector(探测器)的状态(目前只用前三种): INIT, CHECK_DEAD, IDLE, ERROR
, 看下它转换基本流程:
stateDiagram [*] --> CHECK_DEAD: init CHECK_DEAD --> IDLE: update(dead) IDLE --> CHECK_DEAD: sleep(10s)
- 当DFSClient打开了一个输入流(InputStream)时, 在全局维护一个状态机, 加载为
INIT
状态, 然后马上转为CHECK_DEAD
- 更新存储DeadNodes的集合, 然后等待单独的线程检查
CHECK_DEAD
完毕后, 状态机置为IDLE
, 休息10S, 然后切回CHECK_DEAD
状态, 往复循环
所以简单来看, 探测器的状态机很简单, 正常情况就在CHECK_DEAD
和 IDLE
10s一次往复切换, 接下来我们看看其他的实现细节, 主要是具体的数据结构和检测方式
0x02. 具体结构
1. 核心功能
沿着作者的Basic model来看, 有以下几个主要环节:
- 因为常见异常Client和DN之间建立连接或读失败(比如常见的Socket超时), 则先将此DN放入
SuspectNodes
集合, 等待二次判断 (旧版是直接加入dead) - 新增DeadNodes是由各个inputStream汇报给
DeadNodeDetector
的, 也就是说修改自然是对全局可见 Suspect/DeadNode
有自己单独的线程, 去定时检查, 如果发现之前放入的DN恢复通信了, 则会及时移除掉- 除此之外, 当某个inputStream关闭的时候, 也会移除它放入的所有DeadNodes/SuspectNode
补充: 为了避免引入此机制有其他影响, 这个检测机制默认是配置开关的, 默认关闭.
2. 基本设计
A. 线程
DeadNodeDetector
类中有三类工作的线程, 它们也是后续源码阅读的脉络, 核心是两个”生产-消费”模型/队列:
- 主线程, 定时更新检查状态(机), 生产DeadNodes给调度线程消费
- 调度线程, 定时去调度两种工作线程, 分别执行不同状态DN的探测任务.
- 工作线程, 基于调度线程分配,
1v1
绑定异步发送RPC, 分两种:- DeadNodes检查线程, 定时移出RPC恢复正常的
deadNodes
(不负责添加) - SuspectNode检查线程, 定时移出RPC正常的
suspectNode
, 并负责将RPC失败的DN放入DeadNodes中
- DeadNodes检查线程, 定时移出RPC恢复正常的
B. 数据结构
再来看看DeadNodesDetector中的数据结构变化 (原本只有一个map<string, dn>
)
deadNodes<string, dn>
: 原有的deadNodes映射, key是DN的UUID, value是DN信息对象- 仅当suspectNode发送RPC失败后, 才会被加入DeadNodes
- 在RPC恢复正常, 以及对应的inputStream关闭这两个情况下, 会被移出
suspectAndDeadNodes<DFSInputStream, HashSet<dn>
: 这里是以inputStream为单位, 包含的可疑和确认Dead的DN集合- 它在DFSClient连接DN读数据异常时, 添加进来
- 在RPC恢复通信, 以及inputStream关闭时, 移除对应DN
deadNodesProbeQueue<dn>
: deadNodes检测专用队列, 只在两个地方调用:- 仅在主线程更新deadNodes后, 入队
- 默认每60s遍历(poll)一次整个队列, 出队
suspectNodesProbeQueue<dn>
: suspectNodes检测专用队列, 同上:- 仅在各种读异常发生时, 入队 (之前这里是直接加入
DeadNodes
) - 默认300毫秒遍历一次疑似故障队列, 出队
- 仅在各种读异常发生时, 入队 (之前这里是直接加入
probeInProg<string, dn>
: 等待检测的DN列表, 相当于缓存dead和suspect的DNs, 避免检查的时候又被重新加入了- 仅在调度线程定期消费两个坏DN队列时, 加入
- 仅在工作线程发送RPC, 处理的回调函数里移除 (不管RPC是否成功响应)
补充: 检查DN状态的线程分两种
- CHECK_DEAD: 已确定为连接/读失败, 且RPC失败过的DN集合, 看是否恢复RPC
- CHECK_SUSPECT: 检查连接/读失败, 认定为疑似挂掉的DN集合, 看是否可以正常发RPC, 不能则转为DeadNodes
C. 默认值
下面是一些关键参数的默认值:
- deadNodes集合检查周期: 60s
- suspectNode检查周期: 300ms
- 检查线程等待DN返回最长时间: 20s
- deadNodes队列最大容量: 100
- suspectNode队列最大容量: 1000
- deadNodes/suspectNode检查线程池最大值: 10
- 给DN发RPC通信的检查线程池最大值: 20
0x03. 源码分析
接着来对着上述几点, 看看代码. 核心类: DeadNodeDetector
. 相关类: DFSClient
, DFSInputStream
, 核心Detector类随着FileSystem
初始化DFSClient一起启动
1. 初始化 & 检测状态转换
1 | // ClientContext的初始化方法尾部初始化Detector, 并立刻启动线程 |
然后, detector线程启动后, 就开始不断的进行状态变换, 基本流程在前面有说, 代码如下:
1 |
|
接着上面看看CHECK_DEAD
的核心方法: (queue.offer()
可简单理解为有界队列add()
的改良, 队满时返回false而不是抛出异常)
1 | private void checkDeadNodes() { |
然后来看看关键更新deadNodes集合的同步方法:
1 | public synchronized Set<DatanodeInfo> clearAndGetDetectedDeadNodes() { |
2. 调度线程
ProbeScheduler
类是探测DN状态的调度线程, 它定时遍历两个”badDN”队列, 安排对应类型的worker线程去与DN进行通信.
1 | /** |
3. 工作线程
这里是两个小兵(suspect和dead对应线程)实际干活的地方, 也是最里层的线程, 核心就是异步发一次RPC, 在回调方法中处理返回:
1 |
|
这里的回调就是根据RPC是否执行正常, 然后根据当前工作线程类型, 对三个DN集合进行不同的操作, 就是最初的状态机轮转里的具体实现.
0x03. 效果
有个指的注意的点是, 作者说引入DeadNodes检测机制后, Hbase的慢读时延(stuck read)会大幅降低, 这里有两个疑问:
- 小米是怎么发现, Hbase读卡主是因为DeadNodes没有共享导致的重试呢? (通过什么指标/监控项)
- 其次就是, 发现某些Hbase读慢, 比如一次读
1000ms
, 是怎么分析出具体的耗时占比, 从而推测/发现DeadNodes占主要部分
搞清楚这两点, 才能比较好的去得出, 不同场景下是否需要开启这个功能, 或者是说只针对Hbase集群开启? 需要有个量化的方式, 也才能看出, 开启之后, 到底改善了多少慢读
0x04. 思考
除了0x03的问题, 还有一些小疑问汇总:
- 疑问1: 状态机里的
ERROR
状态似乎并没有地方调用, 是预先定义给后续使用的么? - 疑问2: 这里在关闭inputStream时不去清除对应异常DN, 会有什么利弊? 既然全局共享, 不清似乎也有好处
- 疑问3: deadNodes的数据结构, 为何不直接设置为
Set<dn>
, 而要再存一个对应的uuid
作为key - 疑问4: 在Detector的417行
checkDeadNodes()
方法中, 原本的日志输出顺序似乎可以调整一下 (详见文档0x03-初始化代码部分) - 问题5: DeadNodesDetector的297行, 这里注释是笔误了么?
Prode datanode by probe byte. // <-- type?
未完待续…
参考资料: