Gremlin并行计算系统GAIA论文(二)
这篇是单独的 GAIA 论文解析, 是对原 Paper 的简单翻译 + 个人理解, 也保留了一些阅读 paper 中遗留的问题, 以及后续需要看代码确认的地方. 其中保留容易翻译出错的英文部分, 简化非必要描述以及书面用语.
时间所限, 理解不当之处请随时指正和反馈, 欢迎随时大家私信交流, 下面直接进入论文
GAIA: A System for Interactive Analysis on Distributed Graphs Using a High-Level Language
0. 概要
GAIA (Graph Interactive Analysis)是一个为分布式大图设计的交互分析系统, 兼具可在分布式环境使用和低时延的特点
它基于 Gremlin 图查询语言做了适配改造, 使得原本单机执行的 Gremlin 算子可以自动并行化的计算 (核心)
面对复杂/动态的图查询时, 在尽可能使用简单清晰的前提下, GAIA 提出了一个新的抽象概念 —-
Scope
(作用域?)GAIA 已在阿里生产环境中使用, 比起传统的 Gremlin 实现(例如 JanusGraph)有巨大(数量级)的性能提升
概要部分主要就是以上 4 点, 那么第二三点应该是论文和我们关注的重点, 第三点应该是为实现第二点提出的.
1. 简介
这里以阿里(支付宝)的支付场景为例, 简要说了一个类似环路检测(Cycle detection)的场景, 想表达的就是在千亿及以上的大图上进行带条件的环路检测分析是非常复杂和耗时的, 而现有的图计算/分析系统一般是离线的进行全图遍历, 耗时也长, GAIA 的目标就是解决这两个问题, 以 Gremlin 为基础提供交互式查询语言接口, 然后使其并行化降低单点计算的时延
为了实现上面的目标, 需要解决 2 个传统批处理式图计算系统(Pregel, Giraph, PowerGraph, GraphX等)的瓶颈:
- 编程模型 (Programming language): 现有系统开发图算法麻烦, 需要分布式和图的基础, 用户没法独立设计实现 (DSL)
- 内存管理 (Memory management): 现有系统基本都基于经典的 BSP 模型实现, 所有被激活的点都重复执行多轮迭代, 通过出入边发送消息以实现更新, 论文认为 BSP 模式并不适合交互式的查询, 有两个主要原因:
- 实时的交互查询过程伴随着底层图数据的实时更新变动, 并且随着深度/迭代次数增加会消耗大量内存
- 交互式查询经常会同时触发, 也就意味着并发数增大时, 共享的分布式内存会更加吃紧, 并且缓存可能会频繁淘汰, 导致内存更吃紧
然后论文以 Gremlin 实现环路检测为例, 它的写法如下: (具体的解释详见我在第三章代码段里的注释)
g.V(2).as(’s’).repeat(out(’transfer’).simplePath()).times(k-1).where(out(’transfer’).eq(’s’)).path().limit(1)
从上面的例子可以看出用 Gremlin 实现常见图算法可能会非常简易(相较传统图计算框架要写上百行原生代码), 但是不难想到, 它的灵活和随意组合算子的设计是有代价/弊端的, 一般做法如下:
- 要么类似 JanusGraph 这类, 从底层 KV 拉取点边数据, 单点顺序的计算, 效率和并发数都较低
- 要么提供有限的外置语法子集进行扩充 (例如 19 年 CHZ 博士开源的 Grasper, 现于字节 ByteGraph 团队)
除上述的问题外, GAIA 实现时还需要在变化的上下文里控制内存的使用, 那它基于 Gremlin 具体做了哪些改动呢, 有以下几个核心:
- Scope Abstraction: 它允许 GAIA 动态跟踪 Gremlin 查询语句中的数据依赖, 然后把 Gremlin 查询转换为流式图的模式, 来确保正确 & 高效的并行 (中间名词具体的含义在后文会详细图文解释)
- *Bounded-Memory Execution(内存限制): *得益于 Scope 的抽象, GAIA可在(有限)的内存环境下进行并行的图查询(内存可控), 设计了”提前结束”策略减少不必要的查询, 以提升性能 & 节省内存.
- GAIA System: 这里应该指的是提供了框架代码实现, 目前完整版本在阿里内部使用, 开源了一个社区简化版, 性能也比 JanusGraph 执行高效许多
2. 整体结构
GAIA 系统由多个组件交互实现, 包括: 集群管理, 分布式执行器, 语法和应用层等模块, 对应论文下图中的三层:
上图中的 Application 层的绿色框代表的是 Tinkerpop 层 + Client, GAIA 在 FE Service 层实现了 Tinkerpop 的必要接口来满足对它(语法)的兼容, 并且在这里将一个普通的 gremlin 查询语句转换为分布式执行计划, 划分为多个分区发给多台计算节点. 每个分区运行在一个独立的节点上, 并由计算节点本地执行器上进行计算和调度管理 (也就是说 FE 应该也同时兼具消息分发汇总的功能? 另外每个计算节点本地调度管理?)
最后存储层 (Storage) 论文中是用了一个 hash 分布(完全离散)的图, 每个点和它的出入边放一起存储(包括属性), 在论文里为了简化则是假设存储和计算一体, 也就是每个计算节点都有一个单独的图存储分区, 对应到开源代码里应该是 PetGraph (对Rocksdb的封装). 而内部生产环境里是一个独立的分布式图存储, 就没多提了.
3. 如何使用 GAIA
GAIA 设计理念是尽可能完全兼容原有 Gremlin 体系, 不用做额外的修改, 这一节讲述数据模型 + 查询语言的概念
在 Gremlin 查询设计里, T(Traverser) = (l, P) 是最基本的查询语句抽象 (其中 l 代表 location 也就是当前点边, P 代表 path - 可选), 然后如下图简单所示, 从 vid = 1 的点出发(只看出边), 可以有 1 -> 2 -> 3
和 1 -> 2 -> 4
两个二度查询路径.
Gremlin 中另一个关键设计是允许嵌套(nested)查询, 可如之前环路检测的写法那样, 把一个查询嵌入到其他的 step/operator 里, 比如 where(xxx)
语法可嵌套普通查询语句作为过滤器, select()
或 order()
函数则将嵌套的查询映射排序后输出. (此处翻译不佳)
嵌套查询也是 gremlin 支持循环的基础/关键, 一般在 gremlin 里使用repeat(xx) + until()/time()
的组合来表示将 xx 查询进行多轮迭代, 直到满足某个条件中止. 下面以之前讲环路检测的 gremlin 写法为例, 来简单介绍一下: (详细可参考)
1 | // 在 K 层/跳内, 找出从 vid=2 的点出发的一条转账环路 |
PS: 这里比较困惑的是, 分布式执行环境似乎和传统意义上的 GraphServer 层是耦合在一起的, 那如何实现 GAIA on Janus / Nebula 这样的效果呢? (如何对原有单点执行 Gremlin 的图数据库替换为 GAIA?)
4. Gremlin 编译
这里首先对应之前 GAIA 架构图里的 FE Service 部分, 如何把一条 gremlin 查询语句, 转换为数据流图 (dataflow graph), 然后举了一个例子, gremlin 查询某点的2度邻居数统计 g.V(2).out().out().count()
是如下方式转换的:
上图里图(a) 代表整个图结构, b 代表查询语句中的每个算子转为对应的四个操作符 o1 ~ o4 (Operator) , 这里原文说的稍有些绕, 我简化后理解是把原本 gremlin 里的 step一一转化为 GAIA 里的一个算子(o), 然后这里可能把 count()
这类的方便做下推汇聚的称之为 sink operator (下沉算子), 然后论文说能按这种方式转换映射, 是由于 Gremlin 本身不限制算子执行顺序(存疑?)
然后图(c) 中每个阶段对应一个 stream, 集合里的 EOS(End-Of-Stream)
是结束标记, 论文表示这个设计是为了保证转化后的 opertaor 拥有屏障语义 (可能类似是说保证每个 operator 是否完成的标记?), 然后 EOS 标记会在 stream / dataflow 中逐层的往下传播, 以此保证确认执行计划的完整/正确性.
然后再对着图重看整个转换过程, o1 对应生成了{(v2, 0), EOS}
的输出, 由于接下来的操作里不需要 path, 所以实际是略去了路径记录, 然后到 o2 的时候, 它把 {v2, EOS}
当输入, 处理后生成 v3, v4, 依次传播到最后终止.
4.1 嵌套遍历遇到的挑战
上文提到过的嵌套遍历(Nested Traversal) 是 Gremlin 原本设计的一个核心, 它带来灵活/动态性的同时, 也带来了更大的复杂性, 把上一个 gremlin 查询例子稍加修改, 例如: g.V(2).out().select(’neighbor_count’).by(out().count())
, 把之前的 out.count
嵌入到 select-by
的组合里, 看看会有什么变化:
首先在上面的查询里, 如果把 vid=2 的点的出边(1度邻居)作为一个集合 N(v2)
, 那么每个出点对应的遍历都可以看做是独立的, 也就是说现在的查询是每个1度邻居的出边数并存下来, 而之前的查询是只返回vid = 2 的点2度总邻居数.
在 Gremlin 遍历(数据流)的执行过程中, 假设定义一个唯一(可空)的计算状态, 在没有嵌套 gremlin 语法的情况下 (例如之前的二度邻居总数统计里) , 只需要维护一个 count
的状态, 然后在计算过程中传递它, 而如上的select-by
嵌套写法后, 就变成了需要维护 N 个单独的 count 的状态 (N 是 V2 的出边数).
看到这, 大家(包括我)都可能觉得, 这不是很场景的遍历情况么, 只是说结果是否做了汇聚, 还是多输出一些中间状态的区别. 然而论文指出, 这个 gremlin 查询只是一个很简单的例子, 而这种上下文在动态控制流(dynamic control flow)中会遇到各种各样的算子组合, 导致并行化后的查询系统很难正确 & 统一的设计, 另外如果遇到了超级顶点, 例如 V2 有万~上亿的出边, 那么对应并行化生成的上下文状态也就会变得非常巨大(难以把控).
业内虽有像微软 DryadLINQ (paper)动态的去创建实际上下文的做法, 但是在 Gremlin 的这种细粒度(语义)中是几乎做不到的.
4.2 Scope的定义
为了解决 4.1说的 gremlin 带有大量上下文时处理困难问题, GAIA 提出了在论文开头就强调过的一个概念设计 —– scope(作用域), 以下是它的严谨定义:
Scope 是一个(子)数据流([sub]dataflow) 中满足以下条件的一个子图: 对在子数据流中的任意两个算子 o1, o2, 以及在数据流中的任一算子 o, 满足如果 o 处于
o1 —> o2
的路径上时, o 必须也在子数据流中.A Scope is a subgraph in a dataflow (sub-dataflow) that satisfies the following condition: for any operators o1 and o2 in the sub-dataflow and any operator o in the dataflow, o must also be in the sub-dataflow if o is on a directed path from o1 to o2
PS: 这个说法挺拗口, 通俗怎么理解/表述? (我目前就是当带声明周期/作用域的子图在理解)
首先, scope 在逻辑结构上的设定与之前提到数据流中的算子(operator) 是一样的, 所以可以简单把它视为流图中一个虚拟的算子(v-operator). 然后如下所示, 我们把 scope 包含的上下文称为 scope context, 整个执行流图称为 root scope, 某个子流程称为 child scope. (类似把每个流程/算子当成一个点边, 组成的一个流程图, 所以 scope 和子图看起来概念挺像)
估计这样之前 gremlin 里复杂的上下文, 就对应到这边的多个child scope了, 上图 o2 ~ o5 的算子就可被简化视为一个整体的 Sc算子, I/O 代表它的输入/输出. 这种做法比之前微软论文中提过的 dLINQ 要轻得多. 每个 Scope 有 3 个基础的算子/语法: (后续还有其他算子)
- Enter: 把父 scope 的数据发给子 scope (也算是输入?)
- Exit: 计算结束后, 把结果发回 parent scope
- Goto: 主要用来控制循环跳转, 在下一节(4.3)里说
GAIA 编译器会在每个 Enter / Goto 标记(安装) scope 抽象, 来实现不同上下文切换机制. 在逻辑上会使用序列号 (seq number)的方式来标识 scope 中的上下文, scope 机制包括如下接口: (在4.4节中讲述对应实现)
CreateOrOpen(Data e, CtxID s)
: 传入图数据 e, 打开或创建一个独立上下文, 或是由(唯一)标识 s 打开上下文GetContext(Data e)
: 传入图数据 e, 获取它对应的上下文标识(id?)Complete(Data e, CtxID s)
: 表示如果收到数据 e, 则 s 对应的上下文将被标记为完成(不再收其他数据)
举例来说, GAIA 内置了一个 CONTEXT_PER_ENTRY
(CPE-每个元素的上下文)策略, 它表示为每个图数据 e 都创建一个新的上下文:
- 假设 seq 是从 0 初始化的序列号, CPE 策略会先调用 CreateOrOpen() 给图数据 e 创建一个新的 context
- 然后马上调用 Complete(), 让对应被 seq 标识的 context 不再新接收数据
- 最后, CPE 策略把 seq 的值 + 1, 后续有新的数据则会进入不同的 context
下一节会通过多个例子详细说明如何通过 scope 抽象来优化 gremlin 的嵌套查询.
4.3 使用 Scope 来编译 Gremlin 语句
类似 Hive/LINQ 类不支持动态控制流或嵌套遍历的 DSL这里就不再细说了, 而类似 Gremlin 则会引入子查询的概念来支持它们. 在 GAIA 中, 会把 gremlin 里的每个子查询转换为对应的由一对 “Enter + Exit” 组成的 scope(且支持嵌套). 这里以 3 个常见的 Gremlin 算子 (select-by, where, repeat) 为例, 介绍 scope 的具体应用方式:
a. select-by 查询
还是之前提到的分别计数的查询例子: g.V(2).out().select(’neighbor_count’).by(out().count())
, 以它为例, GAIA 会把原本的 gremlin 语句转换为下图所示的数据流图, 然后其中 by(out().count())
则会被转为 N 个单独的 scope(对应 V2的每个出点), 因为每个 scope 都是独立的查询, 所以这里在 Enter 时启用/安装内置的 CPE 策略用以隔离不同的查询上下文, 避免受到 4.1节提到的干扰问题.
b. where 和 repeat
这里把 where 和 repeat 列为典型的动态控制流(dynamic control flow), 认为它们引入了更多的复杂性, 给了之前提过的环路检测 gremlin 例子:
1 | // 找出从 V2 出发的一条环路 (最大 K 跳/层) |
接下来对上面的 gremlin 查询进行转换, 这里借鉴了 TF(TensorFlow)中的思想, 并将其优化使得可应用在更细粒度的 gremlin 算子中, 并提出了另外几个 GAIA 的原生算子:
- Copy: 传入数据 e, 输出两个相同的结果 (是同时输出么?)
- Switch: 传入布尔值(数据) p, 然后根据p 为 true/false 跳到对应分支 (这不是 if 么?)
- Merge: 接收两个输入流(input stream) 合并输出为一个流
先看 where 对应的下图(a)代表的 gremlin 语句编译转换过程. 从 where()
的语义来说, 它是一个判断条件, 判断当前遍历是否还需继续走下去, 那这里当遇到了 where(xx)
时, GAIA 首先会把它整体视为一个 scope, 并在进入时装载 CPE 的策略 (为每个点边使用独立的上下文), 然后紧接着这里使用复制把数据拷贝一份:
- 一份数据用来走原本 where() 中的具体条件, 例如之前的
out().eq(s)
, - 另一份数据直接跳到 Switch 这, 为何?
最后根据第一份数据的判断结果(bool 值), 在 swich 的地方决定是继续走下去还是舍弃数据/中止等. (这里 COPY 的意义是为了保证安全的并行操作么, 不太理解)

然后再来看看循环(repeat/loop)的转换, 这里需再介绍两个内置的策略: [注:EOS 也视为一个特殊的数据]
- SINGLE_CONTEXT:
- 先为每个数据 e 调用
CreateOrOpen(e, 0)
, 让它们都进入seqId = 0对应的上下文 (何用?) - 仅当 e = EOS 时, 调用
Complete(e, 0)
- 先为每个数据 e 调用
- GET_AND_INC: (类似原子的 i + 1么)
- 首先调用
GetContext(e)
方法获取当前数据对应的 context 的 seqId - 然后定义 seq’ = seq + 1, 调用
CreateOrOpen(e, seq’)
方法进入一个新的上下文 - 最后当且仅当
e = EOS
时, 调用Complete(e, seq’)
- 首先调用
结合这两个 scope 策略, 再结合查询语句看下面的转换图, 首先遇到 repeat(xx)
时, 构建一个 scope, 然后在进入时装在 SC 的策略, 也就是让每个数据进入一个从第 0 次迭代开始的上下文中, 然后走类似 do-while
的逻辑:
- 首先使用 Merge, 合并下一层的数据和之前遍历过的已有结果, 类似用一个大的 Set 来做层序遍历 (每次把下一层加入总体)
- 然后执行循环语句, 例如当前点的出边, 然后判断是否满足终止条件, 满足则退出
- 不满足条件, 则在 GoTo 上装载 GET_AND_INCE, 之前提过 GoTo 具有上下文切换的效果, 然后使用原子增的策略后, 可以作为当前上下文已经走了多少次迭代的标识,

这里让我有些费解的是原文并没有介绍 Merge 在上图中的使用, 只是我个人推测, 但似乎也有些对不上, 我大概推测意思是假设输入是一个点, 遇到 repeat(xx).time(k)
时, 先创建一个 seq=0 的上下文, 然后自己走第一轮出边查询, 得到了 1 度邻居假设 N 个, 把它作为输入合并到下一次的迭代中, 切换并更新自己的上下文 seq+1 (直到 k 停止), 那么:
- 是每轮迭代后, 都会创建一个全新的 context, 和之前的 SINGLE_CONTEXT 完全独立么?
- 这里的 Merge 啥意思? 如果直接把下一层的点和之前的点进行合并, 那就算去重, 不也会重复扫很多之前扫过层的数据么?
- 另外, 这里到底会产生多少个 scope context 呢? 假设就是查某个点的 3 层邻居, 图中每个点固定 5 个出边, 是会产生 5 的 3 次方呢(5^3^), 还是 3 个(按层数), 还怎么算的呢… 按原子自增的策略, 每个点就会产生 k 次隔离的上下文呢.
总之大概意思是能理解的, 但是关键细节的地方还没太确定, 例如上面这个多轮迭代到底如何并行的? 分了多少 scope 出来
4.4 Scope 的实现
上文介绍了 scope 的定义以及使用 scope 后是怎么对 gremlin 做编译优化的, 尽管可以给每个上下文创建物理的数据流算子(如何理解? 是说 GAIA 只是逻辑上创建不占据具体资源么), 但是这样数目太多并不实际, 如何正确高效的实现它是一个大挑战, GAIA 这里采用动态跟踪数据流的每个算子的 I/O 以及内部状态的依赖关系来实现.
GAIA 会给每个遍历(traverser)都打一个标签来标识上下文, 记作 T = [s1,s2,…,sk] (T 是一个 K 维向量-维度对应嵌套深度). Root Scope 默认标记为 []
(空?), 对于 T(Tag) 提出如下操作定义:
- T[∧]: 获取 T 的最后一个上下文标记(id?)
- T[∧→s]: 把 T 的最后一个上下文标记(id?) 替换为 s
- T[+1]: 把 T 的当前维度 + 1, 新的 T 从 0 纬开始算 (对应迭代+1?)
- T[−1]: 把 T 的当前维度 - 1
现在开始, 任何数据 e 都可以被表示为 (T;e), 那么在之前讲过的 scope 结构中, 就允许各种语义(包括 Enter/Exit/内置方法等)对 T(tag) 进行如下显示的修改:
Enter
会将 T 的维度+1, 表示进入一个 scope (等同于 T[+1]; e)Exit
会将 T 的维度 -1, 表示离开一个 scope (等同于 T[-1]; e)CreateOrOpen((T;e), s)
表示用 s 代替 T 的最后一个上下文 (等同于T[∧→s]; e)GetContext((T;e), s)
表示获取 T 的最后一个上下文 (等同于T[∧])Complete((T;e), s)
表示用 EOS 代替当前上下文标记结束 (等同于 T[∧→s]; EOS)
上述这些标记都是 GAIA 系统自动处理的(对用户透明), 所以不影响之前4.3提过的语义算子(copy/merge等)的使用, 被标记后的数据依然可视为同样的数据, 对 Gremlin 的算子例如 out()
或 count()
来说, GAIA做如下处理:
- 先得到时间待计算的图数据 e, 然后应用到 gremlin 原有的计算逻辑中
- 计算过程会生成一些 traversers (记作Ω), 以及这些算子可能的状态变动 (记作τ)
- 对于所有的 e′ ∈ Ω, GAIA 会对 e’ 重新标记 T 并将其发送给输出流
- 对有状态的计算, GAIA 会维护一个T –> τ (标记 –> 状态)的映射关系, 这样来保证透明的从不同的计算上下文对状态进行操作, 像是相互隔离一样 (类似全局共享状态?)
下面的具体举例看看动态依赖追踪的实现过程, 对应的是 4.3 中的图7, 也就是下图的结构和查询. V2 一度出边是 V3和V4, 然后等于是求这两个邻居的出边之和:
那么结合 T(tag) 抽象之后, 对应的细节实现是 (o1~o4 在之前的图7中已经标识)
- 首先接收 V2 的出边 V3 和 V4, 未进入 scope 之前认为源自 parent scope, 则为
{([];v3), ([];v4)}
- 然后它们对应两个独立的 CPE 策略的 select-scope, 触发 Enter 操作时, o
3的 T 维度+1- v3 对应变成
{([0];v3), ([0];EOS)}
(也就是说遇到 Enter 额外创建了 EOS 标记) - v4对应编程
{([1];v4), ([1];EOS)}
(为何这里是[1], 而不是[0]呢)
- v3 对应变成
- o
3输出{([0];v1), ([0];v2), ([1];v3)}
(EOS 标记被省略了) - o
4也就是 count() 会维护一个 Tag –> count 的键值映射, 统计[0]
或[1]
的标记 - 最后, 当 o
4接受到 EOS 的标记, 它就可以输出{([0];2), [[1];1]}
, 最后遇到 Exit , 恢复 Tag 的维度和输入一致为[]
, 则从 o4的输出中恢复生成{([];2), ([];1)}
对应的图示如下: (序号是我标的 v3 的全部流程, 注意 v4第一个指向的虚线被隔断了, 容易误看..)
EOS 标记的生命周期
这里补充说一下如何处理 EOS 标记, 它可以由 Complete()
函数或基本算子在例如 Enter 时在 scope 中引入 (例如上一个图就在进入 scope 时产生对应额外的 EOS 标记), EOS 标记可以在任何计算算子(computing operator)中传递且不会进行实际运算, 但在基本算子 (primitive operator)中就要小心处理 (尤其是遇到 Enter 和 Exit 算子时)
区分 EOS 的生命周期是至关重要的, 我们把在当前 scope 之外产生的 EOS 称为外部 EOS (external EOS), 内部产生的称为内部 EOS (internal EOS). 外部 EOS 标志的 parent scope 的结束, 且必须退回至parent scope(?); 相对的, 内部 EOS 只能在内部传递, Exit
操作符也只允许外部 EOS 传出. 另外, 为了在外部 EOS 标记遇到 Enter 操作符时不需调用 CreateOrOpen()
函数, 我们实现了允许在 Enter 上装载特定策略的做法, 这样就可以将它识别为 0 context (啥意思?)
另外, Switch 也是另一个用于条件判断和循环中的基本算子/操作符, 在判断场景下两类EOS 标记始终会传递到True/False 两个分支上; 而若是在嵌套循环中, 则分内外部两个情况:
- 外部 EOS 标记会保留在循环结束的 Exit 处, 并且只在验证所有循环的上下文都结束后才释放
- 内部 EOS 会在遇到 GoTo 时被标记为其他数据(?), 只要带 T 标记的任何数据传递到下一次迭代, 带有 T 的 EOS 也必须传播到 GoTo 中 (代表与之关联的循环还尚未结束), 否则它将会被当前 scope 丢弃. (会丢去外面么?还是直接销毁) [没理解]
之前说的都是编译转换原有 Gremlin 的过程, 下一章介绍分布式运行时的环境.
5. 分布式运行
对于任一查询语句(例gremlin), GAIA 先通过编译器如上一章所述转换为一个数据流图(dataflow graph), 然后会根据加载图时候的图分区情况对流图进行切分, 但论文这里没具体说是如何划分的 (存储数据和计算的关系具体如何对应呢) (代码确认)
每个 worker 的任务调度都在本地进行, 也就是每个进程自己有一个任务调度管理器, 从 source operator 开始调度, 然后启动其它就绪的 operator (就绪是指当前算子的所有输入都准备好了) . 但是 论文没有说具体是如何调度 operator 的 (代码确认)
GAIA 目前需要用户在执行查询时指定[a * b] (a 代表节点数, b 代表分区数, 个人推测)的并行度(DOP - Degree Of Parallelism) [未来 DOP 的可能会自动推算出来], 然后如下图所示, 这个并行度(b?)大小的设置, 本地的 worker 会将 operators 并行化, 以便多 CPU 核心充分利用: (图中存疑点已标出, 1/2个 Scope 应该改为 1/2 个线程, 开始理解有误)
GAIA 的运行时环境是在多个无状态的计算节点上, 每个计算节点都执行一部分计算, 但论文没细说节点间如何通信, 网络时延, 某个 job/operator 失败/节点宕机等细节问题. (可能都需要代码或与官方确认了..包括 DOP 的具体含义等)
注: 为了简写也方便大家理解, 文中提到的 worker 都代指运行时的 local executor / runtime.
5.1 使内存可控 (Bounded-Memory Execution)
在图查询中有个避不开的问题是随着遍历深度的增加, 内存的使用可能对应指数的增长, 尽管 Gremlin 的查询通常会限定查询深度或是有其他的限制截断, 但仍不可避免中间出现内存的急剧膨胀导致内存爆炸
业内常用的手段有 “backpressure”(推测是类似TCP 流量控制, 反向控制发送速率?) 或是使用交换内存(用磁盘缓冲), 但是因为可能产生死锁, 或是很高的I/O延时, GAIA 都无法直接使用它们 (或理解为不是一个好选择). 而为了在不降低性能(并行度)的前提下仍能控制内存使用, GAIA 让本地的worker (local executor)使用了一种“动态调度”的新数据流执行机制.
a. 动态调度机制 (多点)
1. 批处理机制
对每个算子(operator) 来说, GAIA 会把数据流中的一段连续的 traversers 打包整合为一个批处理(single batch), 而这种批处理可理解为计算和通信过程中的最合适单位/粒度. 一个任务(task) 从逻辑上看, 可视为 “一个算子 + 一批待计算的数据“ 的组合. 当一或多个批处理的输入已经就绪时, GAIA 会动态的创建与之对应的任务 (task), 本地 worker 的调度器会维护一个队列来存放这些任务以便资源共享 (代码需看: batch / task / 调度队列的具体结构)
2. 内置内存控制器
此外, worker 实现了单独的内存分配器(memory allocator), 它会记录/上报每个查询的内存使用总量, 以便 worker 的调度器检查当前内存使用情况: 若内存使用达到一个阈值时, worker 就会停止继续调度更多的任务(直到内存消耗降低至一个低使用门槛时再恢复), 也就是说当机器内存不足时, GAIA 的设定是先不继续派新任务, 等手头的活做差不多再开始启动. 另外, 超级顶点的查询带来的数据输出都可能会耗光内存, 因此我们会在输出数据超过一定容量限制时暂停任务直到输出完后再继续.
PS: 此时使用swap 空间有何坏处呢? 如果数据在内存则磁盘应该空闲才对. 由此引出另一个问题, 图数据存在磁盘上, 那内存中是缓存/预取部分数据么? 有设计特殊的缓存策略么, 论文似乎没有提及, 但这对性能影响应该是挺大的. 测试时可监控磁盘 IOPS 观测
3.背压通信 (backpressure signals)
由于数据不可能都放在一台节点, 多台机器的同时查询很容易涉及跨节点访问, 并且会给任务调度带来依赖关系 (比如第 3 层邻居依赖上一层邻居的结果才能开始执行查询). 如果 A 节点给 B 节点突然发送了大量数据/请求, 就会导致 B 运行内存急剧消耗, 为避免出现这种情况, GAIA 通过背压信号的机制实现了网络发送的速率控制 (推测估计类似 TCP流量控制), 在上面的情况下, 它会暂停 A 的发送任务, 直到 B 节点内存使用恢复正常 (细节需参考代码)
在内存受限的情况, 运行循环边(cyclic edges) 的数据流图可能导致死锁 (不太理解). 在实际场景里, 可能是以下两个原因导致:
- 无止境的重复遍历环路. 对于这个问题, 默认提供一个最大迭代深度 N 来避免, 当达到 N 时, 让 GoTo 算子声明死锁, 一旦发现死锁, 会立刻终止对应的查询并给报错提示.
- 使用 BFS 优先的遍历策略, 但调度异常导致耗光了缓存 buffer, 那后续的 operators 就一直获取不到输入(算饥饿?). 对于这种情况出现, GAIA 采用了 BFS 和 DFS 混合遍历的策略优化. (下详述)
b. BFS/DFS 混合遍历策略
BFS 和 DFS 各自有其明显的优势, 前者实现简单且省内存, 但基本无法并行化; 后者天生易于并行化但是耗费大量内存且复杂性更高. 如果我们能在遍历过程中, 自动/综合的使用二者, 则能将图的查询性能显著提升. 下面说一下具体的思路:
首先我们会让 worker 根据当前数据流的拓扑结构 (例如遍历深度) 来给任务不同优先级. 比如如果多个任务处于同一层序(order), 我们可以应用 BFS 优先的策略, 对往深处探索的遍历应用 DFS 优先策略.
上述的策略可适用于所有的查询任务, 但在循环遍历的场景下(K 度邻居), 多次迭代可能会在同一个 task 中执行. 为了应对这个问题, 在把多个 traversers 打包为一个批处理之前, 我们会给算子一个 buffer 记录当前迭代所对应的上下文 id (还是没理解怎么避免的?) , 确保循环嵌套时仍可以对任务进行优先级排序. 综上, GAIA 默认会自动使用 BFS 优先方式查询(更易于并行化), 在内存超过使用限制时自动切换为 DFS 优先查询. (也就是说这里是无需手动指定, GAIA 会自动取识别判断的吧)
5.2 提前中止优化 (Early-Stop Optimization)
这个很好理解, 如论文举的 DFS 和 BFS 的实现为例, 假设我们使用 kout 求某个点的第 K 度少量邻居 [假设查询语句是: g.V(1).kout(10).limit(5)
], 最终只需要 5 个点, 也不需要前面的 K-1 度的任何数据, 那么我们无需用 BFS 的方式把 K-1度全部点边过一遍再做筛选, 这样浪费了大量的计算 + 内存资源:
理想的做法应该是某个执行器 DFS 达到预期后立刻停止其他计算, 在单点计算系统里, 比较容易能实现类似优化, 但是在分布式环境如何实现就会麻烦许多, 下面具体说一下 GAIA 的实现方式:
一般情况, 一个带有 EOS 标记的 context 遇到了(Exit 或 GoTo) 的操作符时, 标志着当前的上下文马上结束了. 在上图这种查询的例子里, 在没有接收到新的输入 EOS 标记前, limit()
输出了 k 个元素后的确可以结束当前遍历, GAIA 还允许 Complete(e, s)
函数被 scope 中的任何算子调用来生成 EOS 标记来避免向下继续传输不必要的计算. 但是这样做只对后续的计算生效, 并不会影响之前已经处于计算过程中的 scope, 于是为了进一步优化不必要的计算开销, 还做了如下操作:
当 Complete()
方法被任何算子调用时, 它会创建一个与当前上下文 Tag 相同的终止令牌 (cancellation token) , 然后把这个终止指令向上游发送, 接收到这个指令的算子, 会马上生成一个 EOS 标记用于终止当前的计算, 并依次递归的向上继续传递, 直到遇到相同 Enter 的 scope 结束, 这样的设计可概称为“终止指令传播机制”, 然后因为篇幅所限同样没有讲述这里的实现细节, 在下一章里会单独提到这个优化带来的性能测试对比.
至此, 论文核心讲解就完结了, 后续是具体的性能测试, 和业内图数据库/计算系统的对比, 以及一些总结.
6. 实际测试
6.1 测试环境
a. 测试数据集
这里采用 LDBC 的数据生成器, 生成 5 种大小(4G. 40G, 156G, 597G, 1960G)的图数据测试集(表略), 后文默认选择 G300 , 也就是约8亿点, 52亿边, 总约600G的数据集 (最大的测试数据集是26亿点, 170亿边, 2TB)
b. 查询方式
这里查询条件采用类社交网络模型中 LDBC 提供的 14个复杂查询中的10个 (记作 CR1…14), 此外还包括之前论文反复提过的环路检测查询 (参数是: 默认给 10 个点, 最大4跳, limit(10)), 然后这里根据 LDBC 的数据集, 对查询语句做了一些修改, 并且应用了 4.3 节提到的预处理(prepared statement)机制(好像没注意), 把多个点整合到一个查询里
LDBC 的测试客户端设定为随机参数的把每个查询请求 20次, 最后取平均时延 (注: 之后补充一下 CR1..14到底是什么查询)
c. 配置
在后续的测试里, 会先把数据加载到内存(或尽量缓存), 以减少加载数据读磁盘带来的差异 (也就是说图数据可认为就在内存中了), 并会对系统预热.
除 Neo4j 使用 Cypher 外, 其他的系统都使用 Gremlin 进行查询 (已验证正确性), 由于语法编译的时间<1ms, 后续都会忽略不计, 每个查询最大执行时间是1h, 超时标记为 OT(Over Time)
, 最后, GAIA 查询需要手动设置并行度 DOP= x * y
(代表x 台节点 y 个线程)
GAIA 使用 Rust 开发, 论文测试的存储引擎应该是基于开源 Rust 封装过的 Rocksdb (PetGraph?). 对比系统有这些: TinkerGraph (Tinkerpop 内置的内存图), Neo4j (社区版), OrientDB. JanusGraph, Timerly, Plato, 也就是说既包含图数据库也包含图计算系统(plato).
测试环境设定最多 16 台节点, 每个节点 24 核心Inter 2.5GHz, 512GB 内存, 使用万兆网卡(25Gbps, 最大提供2.5G/s 网络带宽?)
6.2 可扩展性 (Scalability)
论文表示目前已知 GAIA 是唯一支持并行 Gremlin 查询的图系统, 在后续实验中为了更好的验证它的性能, 把 LDBC 的 10 个查询分成两类:
- 大数据量查询: CR3/5/6/9, 四个
- 小数据量查询: CR1/2/7/8/11/12, 六个
a. 水平扩展性 (Scale-out)
增加机器, 观察系统性能变化, 在之前的并行度 DOP 设置中, 把单节点线程数 y 固定为 4, 然后节点数 x 测试2, 4, 8, 16 四种.
- 大数据量查询: 从 2 增加到 16 台机器, 性能最高提升 6倍, 最差的 CR3 查询只提升了 3 倍 (分析是因为嵌套子遍历的复杂性, 导致同步过程的开销增加等原因, 如等待 EOS 结束)
- 小数据量查询: 在这种数据量下, 计算并不会很密集, 因此也无法因为并行得到明显的性能改善, 甚至 CR1 的查询性能还会随机器增多逐步下降(也说明有些查询是不需要/不适合并行化的)
b. 垂直扩展性 (Scale-up)
固定机器数为 4 台, 修改 y(线程数) 的值为2, 4, 8, 16, 然后在两类查询上的结果和水平扩展类似, 但是有一个有趣的现象是, 选择 16 节点 * 4 线程的组合, 比 4 节点 * 16线程组合更好 (尽管前者的网络 IO 更多), 说明:
- 网络通信成本不是 GAIA 的主要影响因素, 得益于动态调度的组合拳可以减少通信成本
- 对实时的 TP 类图查询来说, 如果节点较少可能带来了查询更多的数据竞争 (为啥纯读会有竞争?)
在 8 亿点, 52亿边的数据集下, 4节点16线程可以让复杂/大数据量查询时延到个位数/秒级别, 应该还算是挺报错的, 之后结合具体的查询语句和遍历数据量来看看.
c. 数据量增长
固定16节点 * 4线程, 然后改变数据集大小, 如下图所示, 可以看到在 200 点边时, 复杂查询时延还在 20s 以内.
总体来看 GAIA 的查询性能还是具有良好的可扩展性的, 简单看时延和数据规模都还是可圈可点, 然后论文说查询的并行度 DOP 自动设置最佳值是可在未来实现.
6.3 优化点 (Our Design Choices)
这一节重点测试之前提到的几大亮点设计, 同样在 60 亿点边的 G300 数据集使用 16 节点 * 4 线程配置进行测试, 这里的查询使用 Q6 环路检测 (包含了复杂的嵌套查询语法), 也是一个生产中常见的查询, 每个查询默认只允许使用 10GB 的内存空间, 然后并行起点数 m, 最大的深度 k, 和最终结果的数 n 都会组合调整, 然后下面会给四种 GAIA 的典型配置:
- GAIA(默认配置)
- GAIA-DFS (DFS 优先模式, 因为 BFS 做环路检测很容易 OOM, 这里被排除)
- GAIA-NoMB (不限制内存模式)
- GAIA-NoES (关闭提前终止)
a. 动态调度
为了确定动态调度的具体情况, 这里调整查询内存上限为 256/512/1024/2048/4096 MB, 以及无穷大(infinity) , 默认10个起始点
在限定内存使用情况下, 基本都能把内存跑满, 而有趣的是, 随着内存增大计算时间反而不断增加, 论文表示这是内存足够的情况下, 额外的分配和管理开销导致的, 所以反而会带来负面影响.
b. 遍历策略 (BFS/DFS)
这里先把环路检测的结果 limit 数从 10 调整至最大 10^5^(10万个), 观察默认策略, DFS/BFS 优先策略的区别, 如下图所示:
由此可看出 limit(1000)
以内, DFS 都比 BFS 有更好的性能, 当结果超过1w 后, 则开始被 BFS 反超, 也很符合直观上的理解, (注: 当然这里没提到的是 DFS 这样做会导致数据可能会严重倾斜, 类似树的 DFS 结果都在最左侧), 图中蓝色时延应该是代表论文默认的混合策略 (BFS 优先, 内存超限后再 DFS 优先), 实验表明默认的混合策略是更通用的选择 (即使在 DFS 占优的环路检测场景也能有综合还不错的表现)
c. 提前终止
同上, 把 limit 数从10 修改至最多 10万, 可看到开/关提前终止优化, 可带来最多 12 倍的性能提升 (limit = 10时)
d. 对比其他图计算框架
这里选择 plato 和 timely 作为对比 (在它们中分别实现了环路检测, 代码接近百行), 下图显示了时延和内存开销对比, 并发起点数为1/5/10/15/20, 其中 GAIA 的性能比 Plato 高 14 倍, 内存开销少 10 倍, 同时为了验证 GAIA 的优化点对性能的影响因素, 特意还选择了另几种关闭优化项模式下的性能对比, 可看到关闭内存限制速度下降 5.5 倍, 关闭提前终止优化会下降 12 倍以上.
6.4 与图数据库对比
尽管 GAIA 是为分布式计算设计的, 但这里和单机的图数据库也进行性能对比测试, 以表明并非仅限于多机环境, 对比了 TinkerGraph, Neo4j, OrientDB 和 JanusGraph 4种常见单 Server 图数据库, 并从中选取性能最好的结果, 与 GAIA 来对比的情况, 首先看看 GAIA 自己在单点不同线程下的表现:
可以看到 8 线程左右是综合最好的表现, 这里论文表示 Neo4j 实测时候比完压其他系统, 论文认为进一步研究发现它并没有公平的在做图遍历, 认为 Neo4j 使用 join
方式生成了输出结果 (这不太理解), 才得到了比其他系统都好的数据 (似乎是认为 Neo4j 作弊?) 然后这里就没有提及其他系统测试的结果了. (上面测试用了一个小图 G1 仅2000W 数据)
然后再看看大一点的测试图(G100 - 20亿点边)下和 JanusGraph 的对比图, 论文说之所以没有选择其他的图是因为它们都无法支撑对应的数据量, GAIA 使用 1 台计算节点, 调整线程从1~16 来进行每个查询. (OT 代表超时1h?)
7. 相关工作
图数据库
主要是说大部分图数据库(Neo4j, JanusGrpah)的 Server 是单点/单线程计算, 几乎完全没有发挥多核 CPU 的并行计算特性(后略).
图计算系统
与业内典型的图计算系统 Pregel/PowerGraph/GraphX/Gemini/相比, GAIA 侧重实时性 / TP 支持, 并且支持 Gremlin 这种用户友好的 DSL 来开发/执行图算法而不需要写特定语言的代码, 全图的遍历通常使用 BSP 模型, 这种模型更适合所有点进行遍历而不适合局部的遍历.
数据流 & 依赖追踪
与业内常见的动态控制流系统 CIEL/Naiad/TensorFlow 相比, GAIA 侧重提供细粒度/轻量级的依赖追踪 (后略..)
声明式图查询语言
GAIA 这里选择了基于 Gremlin 而不是 Cypher 是因为觉得生产使用上 Cypher 更麻烦一些. 另外理论上说也可以基于其他DSL 做实现. (个人注: 这里没有提及业内较出名 TigerGraph 的 GSQL, 或者 Nebula 的 nGSQL, 应该是可比较一下的)
8. 小结
GAIA 在阿里的一个业务使用了一年多, 总体来看结合了 Gremlin 的灵活性和并行化的计算方式, 还是很适合图计算和图查询交互分析的. 论文最后对 腾讯 Plato 团队的主要研发/解答问题的同学表示了感谢 (其他略)
补充说明:
以上就是全部的论文内容, 最后两章省略了一些非核心的相关工作介绍, 因为我个人也不算熟悉, 也为了大家更快阅读论文的核心点, 就不全部列出了, 大家可以对照原 paper 进行阅读理解, 然后后续这里会即使更新错误的翻译或理解, 以及把看论文中遇到的问题做一个汇总, 还有不少后续的事要做, 先简单记一个 todo-list
TODO:
- 阅读代码后, 重新梳理全文描述
- 中间记录过的问题的 Q & A
- 测试章节的 10 条查询语句, 具体的内容
- 查询语句对应的点边遍历数统计
- GAIA 到底如何替代默认的 Tinkerpop, 做到 JanusGraph + GAIA 的组合, 如果需要适配, 代价多大
- 于 GraphScope 的关联以及其他问题放入 GAIA 的概览篇, 这里只关注论文相关.
参考资料: