图系统Hugegraph-Loader源码学习(一)

之前已经通过入门系列了解了Hugegraph的全貌和性能测试, 从这里开始源码分析系列, 首先是Hugegraph-Loader项目的源码分析, 首先对整体结构进行梳理, 然后沿着常用的导入一条线来进行分析, 最后选择其中特别的地方单独学习.

5/8月更新: Loader在0.9/1.0版做了大量的重构, 后续单独再开一篇讲这两版的主要变动.

0x00. 整体结构

首先 ,可以看到项目整体的结构分为6个核心模块 , 入口是最下方的HugegraphLoader类 :

hgLoader02

看代码之前, 回顾一下之前使用的步骤, 先估计一下大概的流程 :

  1. 从HugeGraphLoader启动导入程序
  2. 通过executorLoadOptions 解析命令行下输入的各种参数
  3. 读取两个配置文件
    • 通过GroovyExecutor读取schema.groovy , 然后调用HugeGraphClient的API创建Schema
    • 通过JsonFileReader读取mapping.json , 但不确定在哪做的映射(source?)
  4. 通过reader的不同实现, 读取数据文件 ** ,然后转为source**中的输入?
  5. 调用task包,开始启动导入任务, 并发的执行第六步
  6. 启动任务的时候, 开始调用对应的parserexecutor 的日志/统计
  7. 如果导入完成, 跳回HugeGraphLoader, 显示汇总信息结束

这里面serializersource 没太想明白, 为什么有图的反序列化? 之后再沿着导入脉络确认是做什么的..

0x01. 导入脉络

整体围绕最开始说的HugeGraphLoader类进行, 那就从这开始看代码, 首先构造方法里初始化了三个final的对象

1
2
3
4
5
6
private HugeGraphLoader(String[] args) {
this.options = new LoadOptions();
this.parseAndCheckOptions(args);
this.taskManager = new TaskManager(this.options);
this.graphSource = GraphSource.of(this.options.file); //映射文件地址
}

然后实现命令行下读取和解析参数主要是依赖的JCommander 这个库 , 通过注解映射对应的参数, 还是比较小巧灵活的 ,使用参考github . 然后就进入核心的load()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private void load() {
//1. Create schema
this.createSchema();

//2. Load vertices
System.out.print("Vertices has been imported: 0\b"); //'\b'表示退格,所以'0'就不会显示了...那实时计数是?
LoadSummary vertexSummary = this.loadVertices();
System.out.println(vertexSummary.insertSuccess());
// Reset counters
this.resetCounters();

//3. Load edges
System.out.print("Edges has been imported: 0\b");
LoadSummary edgeSummary = this.loadEdges();
System.out.println(edgeSummary.insertSuccess());
// Reset counters
this.resetCounters();

//4. Print load summary
LoaderUtil.printSummary(vertexSummary, edgeSummary);

//5. Shutdown task manager
this.shutdown(this.options.shutdownTimeout);
}

可以看到整体是很清晰的5步, 一步步来看.

1. 创建Schema

这里核心是调用的Groovy 的类库去解析Schema

  1. 首先初始化了一个HugeClient对象

  2. 初始化一个GroovyExecutor 对象, 封装了bind() 和 execute() 两个方法

  3. 通过bind()方法绑定schema和client.schema() [K-V]

  4. 读取schema.groovy 文件转为字符串, 然后把字符串和HugeClient对象传给execute()方法. 这里引用一下代码:

    1
    2
    3
    4
    5
    6
    GroovyShell shell = new GroovyShell(getClass().getClassLoader(),this.binding, config);

    // Groovy invoke java through the delegating script.
    DelegatingScript script = (DelegatingScript) shell.parse(groovyScript);
    script.setDelegate(client);
    script.run();

然后内部应该跟开始预测的一样, run()的时候实际就是去调用Client的API了, 那部分之后再看. 第一步创建Schema就结束了

2. 加载顶点

这是之前我们定义映射的文件, 我简单截图以便后续步骤对应: (注意已经支持文件夹读取了, 目前是一个个文件去遍历)

hgLoader03

loadVertices() 方法就是导点的汇总, 简要说它有以下步骤:

  1. 记录开始时间, 并初始化VertexSource集合 (这个实际就是映射json里vertices的各种参数值)

  2. 遍历VertexSource集合, 通过InputReaderFactory 这个解析工厂具体读input (相当于依次遍历上面映射文件里的每个VertexLabel)

    • 首先是根据type 参数确定是本地文件还是其他(比如HDFS)
    • 然后初始化一个VertexParser 对象, 此时会同时初始化一个ElementParser ,获得了一个特定文件解析对象fileReader
    • 再把其他的参数传入, 并检查ID策略 (目前只允许主键模式ID/自定义ID)
  3. 读取设置的batchSize数值, 然后用它初始化一个Vertex集合 ,然后开始一行行解析顶点数据文件

    • 这里主要就是两步, 一读取&过滤K-V , 二解析返回

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
       Map<String, Object> keyValues = reader().next();
      return parse(filterFields(keyValues)); //Vertex or Edge

      //filterFields()这里处理ignoredFields, nullValues.截取处理可空字段
      //从处理方式可以看出,映射文件里的可空字段应尽量少写,否则会对全部属性遍历判断一次.
      if (!nullableKeys.isEmpty() && !nullValues.isEmpty()) {
      Iterator<Map.Entry<String, Object>> itor = keyValues.entrySet()
      .iterator();
      itor.forEachRemaining(entry -> {
      String key = entry.getKey();
      Object val = entry.getValue();
      if (nullableKeys.contains(key) && nullValues.contains(val)) {
      itor.remove();
      }
      });
      }
    • 解析实际调用的VertexParser ,主要两步 :1. 设置ID 2. 附着属性 (这个简单)

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
       /* 根据不同的ID策略进行校验和设置,主键模式这里麻烦一些(因为可能是联合主键)
      * 1.要注意ID长度以字节长判断,而不是字符长,所以中文等编码可能几十个就超了
      * 2.联合主键的时候顶点ID设置考虑
      */
      id.getBytes().length <= 128

      /*设置主键ID核心 (TODO:这里用stream可能有更好的效率和可读性),下面是一些ID转换例子
      * 第一列是原始主键值, 第二列是中间处理值, 第三列是最后vertexID值
      * 1. jin --> jin! --> 1:jin
      * 2. ji:n --> ji`:n! --> 1:ji`:n
      * 3. jin + HK --> jin!HK! --> 1:jin!HK (联合主键,比如姓名+城市)
      * 备注: 需要注意的是,在边的id解析的时候,`和> 两个字符会作为分隔符标识, 容易有冲突, 需要千万注意
      */
      protected String spliceVertexId(VertexLabel vertexLabel, Object[] primaryValues) {

      StringBuilder vertexKeysId = new StringBuilder();
      String[] searchList = new String[]{":", "!"};
      String[] replaceList = new String[]{"`:", "`!"}; //替换前缀加一个`号,这样做看起来似乎有些奇怪
      for (Object value : primaryValues) {
      String pkValue = String.valueOf(value);
      pkValue = StringUtils.replaceEach(pkValue, searchList, replaceList);
      vertexKeysId.append(pkValue);
      vertexKeysId.append("!"); //联合主键的时候分隔符
      }

      StringBuilder vertexId = new StringBuilder();
      vertexId.append(vertexLabel.id()).append(":").append(vertexKeysId);
      vertexId.deleteCharAt(vertexId.length() - 1);
      return vertexId.toString();
      }
    • 然后一条条这样读取, 直到达到设定的提交阈值(默认500) ,触发一次taskManager.submitVertexBatch() 提交, 然后重置

  4. 需要注意的是提交的时候是有两种模式的(正常&错误) , 在解析错误的时候, 会转而尝试单条写入(Semaphore类后续待研究)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
        this.ensurePoolAvailable();
    this.batchSemaphore.acquire();

    InsertionTask<Vertex> task = new VertexInsertionTask(batch, this.options);//batch是传入的List<Vertex>
    ListenableFuture<Integer> future = this.batchService.submit(task);
    Futures.addCallback(future, new FutureCallback<Integer>() {
    //回调call(): --> this.client().graph().addVertices(this.batch());
    @Override
    public void onSuccess(Integer size) {
    successNum.add(size); //一次加500 (LongAdder)
    batchSemaphore.release();
    printProgress("Edges", BATCH_PRINT_FREQUENCY, size);
    }

    @Override
    public void onFailure(Throwable t) {
    submitVerticesInSingleMode(batch);
    batchSemaphore.release();
    }
    })

    //下面是SingleMode的核心.复写call()
    this.submitInSingleMode(() -> {
    for (Vertex vertex : vertices) {
    try {
    graph.addVertex(vertex);
    successNum.add(1); //单条
    } catch (Exception e) {
    failureNum.add(1);
    LOG_VERTEX_INSERT.error(new InsertException(vertex,e.getMessage()));
    }}}}
  5. 记录结束时间, 并更新LoadSummary 类中的相关信息. 然后重置计数器.

3. 加载边

边和顶点是一套模式, 大部分相似, 主要不同的地方就在边ID处理这里, 因为边由出入顶点ID组合而成 , 这里需要分别设置, 等于把之前顶点设置ID的步骤再走一次, 但是Loader里并不负责拼接边ID, 这里给一下批量执行的时候, 重试机制:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Override
public Integer call() {
if (this.batch == null || this.batch.isEmpty()) {
return 0;
}

int retryCount = 0;
do {
try {
//这是核心,通过Client发送HTTP请求,见关键点分析
this.execute(); // -->this.client().graph().addEdges(this.batch(), this.options().checkVertex);
break;
} catch (XxException e) {
//这里分了Client和Server两种异常,但是目前不管是哪种最后都会进入单条模式,后续待优化
retryCount = this.waitThenRetry(retryCount, e); //抛出异常则进入onFailure(Thowable e)
//Thread.sleep(n);if (++retryCount > this.options.retryTimes) throw e;
}
retryCount = this.waitThenRetry(retryCount, e);
}
} while (retryCount > 0 && retryCount <= this.options.retryTimes);

return this.batch.size();
}

4. 统计信息 & 结束

统计信息的话是把之前顶点和边中更新过的LoadSummary 对象进行格式化输出, 只是一个汇总不单独说了.

结束这里还是有些文章的, 比如: 如何更优雅, 更合适的结束. 但时间原因这部分先搁浅一下, 后面有空研究补充.

0x02. 关键点

1. HTTP请求写入

因为Huge的模块拆分比较独立, 所以可以看到HugeLoader里的导入本质是调用的HugeClient模块里的点/边create() 方法. 这里来看看具体实现:

在上面批量导入的时候, 最后调用到了addVertices()addEdges() , 二者几乎相同, 先看看添加顶点:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public List<Vertex> addVertices(List<Vertex> vertices) {
List<Object> ids = this.vertexAPI.create(vertices); //create是主要

for(int i = 0; i < vertices.size(); ++i) {
Vertex vertex = (Vertex)vertices.get(i);
vertex.id(ids.get(i)); //这里id是一个什么?
this.attachManager(vertex);
}
return vertices;
}

public List<Object> create(List<Vertex> vertices) {
MultivaluedHashMap<String, Object> headers = new MultivaluedHashMap();
headers.putSingle("Content-Encoding", "gzip");
RestResult result = this.client.post(this.batchPath(), vertices, headers); //post()是关键,放在Huge-common包中
//上面的post()最后发出请求的一行: Response response = this.request(() -> builder.get().post(entity.get()));
List<Object> ids = result.readList(Object.class);
if (vertices.size() != ids.size()) {throw new NotAllCreatedException(
"Not all vertices are successfully created, expect '%s', the actual is '%s'", ids, new Object[]{vertices.size(), ids.size()});
} else {
return ids;
}}

然后相比点, 批量写边主要是create()方法有一个可选的对应点检查(是否存在) , post() 的详细实现等后续看common库再说了.

1
2
Map<String, Object> params = ImmutableMap.of("check_vertex", checkVertex);
RestResult result = this.client.post(this.batchPath(), edges, headers, params); //request发送之前, 先query查一下

2. 反序列化包(*)

上面的导入整体流程中, 我们还是没有看到最初分析包结构的时候出现的serializer包, 看完了导入整体脉络. 再单独看看它的作用和实现吧.

首先Loader里使用的是国外经典的Jackson 序列化框架, 然后至于为什么是反序列化而不是序列化… 一是因为它是在JsonUtil 里被调用的, 本身作用就只是在解析json 数据文件和我们的映射json文件, 二是序列化成Json/GraphSon的工作应该是在server库里实现的 ,那搞清楚了原因和作用, 接着来简单看看如何引入Jackson (可跳)

  1. 自带的一系列注解, 最常用的 @JsonProperty , 用于把属性名序列化为json对应的名称.
  2. 通过原生提供的树形模型(tree model) ,通过操作JsonNode 来进行读写.
  3. 通过模块注册的方式, 把自定义的类加载进去
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public abstract class InputSourceDeserializer<E> extends JsonDeserializer<E> {

private static final ObjectMapper MAPPER = new ObjectMapper();//核心对象,可对原生类型进行json序列化

protected <T> T read(JsonNode node, Class<T> clazz) {
return MAPPER.convertValue(node, clazz);
}

protected static JsonNode getNode(JsonNode node, String name,
JsonNodeType nodeType) {
JsonNode subNode = node.get(name);
if (subNode == null || subNode.getNodeType() != nodeType) {
throw DeserializerException.expectField(name, node);
}
return subNode;
}
}

//然后在需要序列化的类里注入属性,比如这是VertexSource和EdgeSource共有的属性
@JsonProperty("label")
private String label;
@JsonProperty("input")
private InputSource input;
@JsonProperty("mapping")
private Map<String, String> mappingFields;
@JsonProperty("ignored")
private Set<String> ignoredFields;
@JsonProperty("null_values")
private Set<Object> nullValues;

//最后在JsonUtil中通过模块的方式注册自定义的类
static {
SimpleModule module = new SimpleModule();
module.addDeserializer(VertexSource.class, new VertexSourceDeserializer());
module.addDeserializer(EdgeSource.class, new EdgeSourceDeserializer());
module.addDeserializer(FileSource.class, new FileSourceDeserializer());
registerModule(module);
}

其他也属于Jackson 的细节了, 在这就不深究了.

0x03. 存疑点

因为项目里比较多的使用了Guava和一些外部库, 有些一时可能没时间细看实现原理和源码, 先记录一下, 之后可以慢慢补 (重点是要补Guava)

1. 工具类 & 数据结构 & 并发 & 异步相关 (待查)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
/*Guava相关*/
//数据结构
Table<X, Y, Z> tables = HashBasedTable.create(); //是什么结构?优点是什么
ImmutableMap // of(K,V)

//异步&回调
ListenableFuture<T> future = ListeningExecutorService.submit(Callable<T> task)

//并发
Futures.addCallback(ListenableFuture<V> future, FutureCallback<? super V> callback)
MoreExecutors.listeningDecorator(ExecutorService delegate)

/*JDK原生*/
//1.并发
Runtime.getRuntime().availableProcessors(); //取的当前JVM可用逻辑核数,那官方文档里的threadNum = CPUs*2-1 是怎么来的?
LongAdder //diff with AutomicLong?
Callable<V> //call()调用 ,关键在于怎么回调的, 和Guava 并发库如何结合
Semaphore
ExecutorService
//2.数据结构
MultivaluedHashMap<K, V>
Collections.unmodifiableList //不可变List

2. huge-common 库中关于HTTP发送接收的实现, 比如上面批量写点/边的post() .

待补…

3. 关于任务调度和结束机制

待补…

4.动态刷新实现

这里主要是学习一下\b的用法, 之前没有注意这个转义符还能这么用..

1
2
3
4
5
6
7
 String backward(long word) {
StringBuilder backward = new StringBuilder();
for (int i = 0, len = String.valueOf(word).length(); i < len; i++) {
backward.append("\b"); //挺巧妙的
}
return backward.toString();
}

5. POST请求对应Server端接口

在Server的api模块的com.baidu.hugegraph.api.graph包中, 有VertexAPI, EdgeAPI, 可以看到一个@Path注解为“batch”的路径

URL前缀为 : graphs/{graph}/graph/vertices , 下面以edges为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@POST
@Timed
@Decompress
@Path("batch")
@Status(Status.CREATED)
@Consumes(APPLICATION_JSON)
@Produces(APPLICATION_JSON_WITH_CHARSET)
public List<String> create(@Context HugeConfig config,@Context GraphManager manager,@PathParam("graph") String graph,
@QueryParam("check_vertex")@DefaultValue("true") boolean checkVertex,
List<JsonEdge> jsonEdges) {
LOG.debug("Graph [{}] create edges: {}", graph, jsonEdges);
checkCreatingBody(jsonEdges);

HugeGraph g = graph(manager, graph);
checkBatchSize(config, jsonEdges);

TriFunction<HugeGraph, Object, String, Vertex> getVertex = checkVertex ? EdgeAPI::getVertex : EdgeAPI::newVertex;

return this.commit(config, g, jsonEdges.size(), () -> {
List<String> ids = new ArrayList<>(jsonEdges.size());
for (JsonEdge jsonEdge : jsonEdges) {
//NOTE: If the query param 'checkVertex' is false, then the label is correct and not matched id,
//it will be allowed currently
Vertex srcVertex = getVertex.apply(g, jsonEdge.source,jsonEdge.sourceLabel);
Vertex tgtVertex = getVertex.apply(g, jsonEdge.target,jsonEdge.targetLabel);
Edge edge = srcVertex.addEdge(jsonEdge.label, tgtVertex,jsonEdge.properties());
ids.add(edge.id().toString());
}
return ids;
});
}

TODO:

  1. 导入开始前完整检测一下映射文件 (目前是挨个检查, 不太友好)
  2. Loader后续经历了两次大改版, 针对最新一次的改版需要重新分析一下.

后续待补


参考资料:
  1. Hugegraph-Loader源码
  2. Guava官方Wiki
  3. Jackson基本使用