因为图服务的整个流程最近才得以完善, 市面上开源的图计算或图存储一般实现都很有些复杂(比如JanusGraph,GraphX) ,就像自己动手写OS/虚拟机/Parser一样, 我觉得很多时候如果能自己动手 去写一个图系统. 那么对整个图和Tinkerpop的了解 我相信会好很多.
今天的主人公就是Tinkerpop的作者自己写的In-memory 图系统—-TinkerGraph ,虽然是基于内存实现,但是提供了持久化的选择, 以及图计算+图存储 的基本实现. 觉得很适合作为学习项目.
0x00.代码结构 2.21更新: TinkerGraph后面又小更新了一下, 大家自行拉取官方最新版, 细节变动不影响整体阅读~
首先要知道, Tinkerpop(TP)自带的**七个图数据结构(接口)**需要全部实现, 所以结构这基本就是实现图结构后的对应类. TinkerGraph(TG)如下图所示:
0x01.structure 先看看核心要实现Tinkerpop的7个必须数据接口 ,以及后续的其他类. 我已经把全部代码抽象出来了.注意所有的类基本都是final的(也就是说默认所有方法也是final) ,修饰参数和对象也大量使用final.详见
1. TinkerElement (基类) 实现Element接口, 需要注意的是, 这个元素 接口也是Vertex, Edge 接口的基础, 一个元素可以包含多个属性对象 .结构如下:
1 2 3 4 5 6 7 8 9 10 11 Object id () ; String label () ; boolean removed = false ; Object id () String label () Graph graph () void remove () <V> Property<V> property (final String key, final V value)
2. TinkerEdge 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 Map<String, Property> properties; Vertex inVertex; Vertex outVertex; void remove () ;Property<V> property (final String key, final V value) Iterator<Vertex> vertices (final Direction dir)
3. TinkerProperty<V> 1 2 3 4 5 6 7 8 Element ele; String key; V value; void remove ()
4. TinkerVertex 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 Map<String,List<VertexProperty>> properties; Map<String,Set<Edge>> outEdges; Map<String,Set<Edge>> inEdges; TinkerGraph graph; <V> VertexProperty<V> property (Cardinality cd, String key,V val,Object... objs) Edge addEdge (String label,Vertex vertex, Object... kvs) void remove () Iterator<X> xs (X... x)
5. TinkerVertexProperty<V> 注意, TinkerGraph中Cardinality包含在TVP内了(默认也是SINGEL). 在JanusGraph和其他实现一般都会单独抽取出来.
1 2 3 4 5 6 7 8 9 10 11 12 13 Map<String,Property> properties; TinkerVertex vertex; String key; V value; void remove ()
6. TinkerIndex<T extends Element> 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 Map<String,Map<Object,Set<T>> index = new ConcurrentHashMap <>(); Set<String> indexedKeys = new HashSet <>(); Class<T> indexClass; TinkerGraph graph; void put (String key,Object v,T element) void createKeyIndex (String key) List<T> get (String key,Object v) void remove (String key,Object v, T element) void removeElement (T element)
7. TinkerGraph (核心) TinkerGraph定义了最多的接口,, Janus中类似JanusGraph的实现StandardJanusGraph. 只是区别在于Janus是一个复杂完整的实现, 所以它在Graph的基础上继续做了两层抽象, 而不是直接使用JanusGraph类去实现Graph接口. 本质上和TG是一样的. (理解这个很重要, 不然你不知道为什么Janus中会莫名其妙抽的那么复杂, 到底谁是幕后boss )
8. TinkerFactory 这里有几个工具和辅助类, 包括序列化与反序列化的类. 和一张图本身元素的存储类(类似Janus中的SystemSchemaXxx . 只是叫法不同). 之后补充
9. TinkerRegistry 12. TinkerRegistryV2d0 10. TinkerGraphVariables 11. TinkerHelper 这个类其实是个中介, 常见的顶点/边/属性类其实都经常依赖它的方法, 但是其实底层并非自己实现.(它的模板来自Tinkerpop原本的ElementHelper 类). TkHelper有几个核心方法:
static Edge addEdge()
void addInEdge() //这两个方法其实基本一样,可以整合为addInOrOutEdge()的
void addOutEdge()
Iterator<TinkerEdge> getEdges()
Iterator<TinkerVertex> getVertices()
0x02.源码精选 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 public final class TinkerProperty <V> implements Property <V> { protected final Element element; protected final String key; protected V value; @Override public void remove () { if (this .element instanceof Edge) { ((TinkerEdge) this .element).properties.remove(this .key); TinkerHelper.removeIndex((TinkerEdge) this .element, this .key, this .value); } else { ((TinkerVertexProperty) this .element).properties.remove(this .key); } } } public class TinkerVertexProperty <V> extends TinkerElement implements VertexProperty <V> { protected Map<String, Property> properties; private final TinkerVertex vertex; private final String key; private final V value; @Override public void remove () { if (null != this .vertex.properties && this .vertex.properties.containsKey(this .key)) { this .vertex.properties.get(this .key).remove(this ); if (this .vertex.properties.get(this .key).size() == 0 ) { this .vertex.properties.remove(this .key); TinkerHelper.removeIndex(this .vertex, this .key, this .value); } final AtomicBoolean delete = new AtomicBoolean (true ); this .vertex.properties(this .key).forEachRemaining(property -> { if (property.value().equals(this .value)) delete.set(false ); }); if (delete.get()) TinkerHelper.removeIndex(this .vertex, this .key, this .value); this .properties = null ; this .removed = true ; } } @Override public <U> Iterator<Property<U>> properties (final String... propertyKeys) { if (null == this .properties) return Collections.emptyIterator(); if (propertyKeys.length == 1 ) { final Property<U> property = this .properties.get(propertyKeys[0 ]); return null == property ? Collections.emptyIterator() : IteratorUtils.of(property); } else return (Iterator) this .properties.entrySet().stream().filter(entry -> ElementHelper.keyExists(entry.getKey(), propertyKeys)).map(entry -> entry.getValue()).collect(Collectors.toList()).iterator(); } } final class TinkerIndex <T extends Element > { protected Map<String, Map<Object, Set<T>>> index = new ConcurrentHashMap <>(); protected final Class<T> indexClass; private final Set<String> indexedKeys = new HashSet <>(); private final TinkerGraph graph; protected void put (final String key, final Object value, final T element) { Map<Object, Set<T>> keyMap = this .index.get(key); if (keyMap == null ) { keyMap = new ConcurrentHashMap <>(); this .index.put(key, keyMap); } Set<T> objects = keyMap.get(value); if (null == objects) { objects = new HashSet <>(); keyMap.put(value, objects); } objects.add(element); } public List<T> get (final String key, final Object value) { final Map<Object, Set<T>> keyMap = this .index.get(key); if (null == keyMap) { return Collections.emptyList(); } else { Set<T> set = keyMap.get(value); if (null == set) return Collections.emptyList(); else return new ArrayList <>(set); } } public void remove (final String key, final Object value, final T element) { final Map<Object, Set<T>> keyMap = this .index.get(key); if (null != keyMap) { Set<T> objects = keyMap.get(value); if (null != objects) { objects.remove(element); if (objects.size() == 0 ) { keyMap.remove(value); } } } } public void removeElement (final T element) { if (this .indexClass.isAssignableFrom(element.getClass())) { for (Map<Object, Set<T>> map : index.values()) { for (Set<T> set : map.values()) { set.remove(element); } } } } public void createKeyIndex (final String key) { validate(key); this .indexedKeys.add(key); (Vertex.class.isAssignableFrom(this .indexClass) ? this .graph.vertices.values().<T>parallelStream() : this .graph.edges.values().<T>parallelStream()) .map(e -> new Object []{((T) e).property(key), e}) .filter(a -> ((Property) a[0 ]).isPresent()) .forEach(a -> this .put(key, ((Property) a[0 ]).value(), (T) a[1 ])); } } public final class TinkerGraph implements Graph { static { TraversalStrategies.GlobalCache.registerStrategies(TinkerGraph.class, TraversalStrategies.GlobalCache.getStrategies(Graph.class).clone(). addStrategies(TinkerGraphStepStrategy.instance())); } protected AtomicLong currentId = new AtomicLong (-1l ); protected Map<Object, Vertex> vertices = new ConcurrentHashMap <>(); protected Map<Object, Edge> edges = new ConcurrentHashMap <>(); protected TinkerGraphVariables variables = null ; protected TinkerGraphComputerView graphComputerView = null ; protected TinkerIndex<TinkerVertex> vertexIndex = null ; protected TinkerIndex<TinkerEdge> edgeIndex = null ; protected final IdManager<?> vertexIdManager; protected final IdManager<?> edgeIdManager; protected final IdManager<?> vertexPropertyIdManager; protected final VertexProperty.Cardinality defaultVertexPropertyCardinality; private final Configuration configuration; private final String graphLocation; private final String graphFormat; private TinkerGraph (final Configuration configuration) { if (graphLocation != null ) loadGraph(); } @Override public Vertex addVertex (final Object... keyValues) { ElementHelper.legalPropertyKeyValueArray(keyValues); Object idValue = vertexIdManager.convert (ElementHelper.getIdValue(keyValues).orElse(null )); final String label = ElementHelper.getLabelValue(keyValues).orElse(Vertex.DEFAULT_LABEL); if (null != idValue) { if (this .vertices.containsKey(idValue)) throw Exceptions.vertexWithIdAlreadyExists(idValue); } else { idValue = vertexIdManager.getNextId(this ); } final Vertex vertex = new TinkerVertex (idValue, label, this ); this .vertices.put(vertex.id(), vertex); ElementHelper.attachProperties(vertex, VertexProperty.Cardinality.list, keyValues); return vertex; } @Override public <C extends GraphComputer > C compute (final Class<C> graphComputerClass) { if (!graphComputerClass.equals(TinkerGraphComputer.class)) throw Graph.Exceptions.graphDoesNotSupportProvidedGraphComputer(graphComputerClass); return (C) new TinkerGraphComputer (this ); } @Override public <I extends Io > I io (final Io.Builder<I> builder) { return (I) builder.graph(this ).onMapper(mapper -> mapper.addRegistry(TinkerIoRegistry.getInstance())).create(); } public void clear () { this .vertices.clear(); this .edges.clear(); this .variables = null ; this .currentId.set(-1l ); this .vertexIndex = null ; this .edgeIndex = null ; this .graphComputerView = null ; } @Override public void close () { if (graphLocation != null ) saveGraph(); } private void loadGraph () { final File f = new File (graphLocation); if (f.exists() && f.isFile()) { try { if (graphFormat.equals("graphml" )) { io(IoCore.graphml()).readGraph(graphLocation); } else if (graphFormat.equals("graphson" )) { io(IoCore.graphson()).readGraph(graphLocation); } else if (graphFormat.equals("gryo" )) { io(IoCore.gryo()).readGraph(graphLocation); } else { io(IoCore.createIoBuilder(graphFormat)).readGraph(graphLocation); } } catch (Exception ex) { throw new RuntimeException (String.format("Could not load graph at %s with %s" , graphLocation, graphFormat), ex); } } } private void saveGraph () { final File f = new File (graphLocation); if (f.exists()) { f.delete(); } else { final File parent = f.getParentFile(); if (parent != null && !parent.exists()) { parent.mkdirs(); } } try { if (graphFormat.equals("graphml" )) { io(IoCore.graphml()).writeGraph(graphLocation); } else if (graphFormat.equals("graphson" )) { io(IoCore.graphson()).writeGraph(graphLocation); } else if (graphFormat.equals("gryo" )) { io(IoCore.gryo()).writeGraph(graphLocation); } else { io(IoCore.createIoBuilder(graphFormat)).writeGraph(graphLocation); } } catch (Exception ex) { throw new RuntimeException (String.format("Could not save graph at %s with %s" , graphLocation, graphFormat), ex); } } private <T extends Element > Iterator<T> createElementIterator (final Class<T> clazz, final Map<Object, T> elements,final IdManager idManager, final Object... ids) { final Iterator<T> iterator; if (0 == ids.length) { iterator = elements.values().iterator(); } else { final List<Object> idList = Arrays.asList(ids); validateHomogenousIds(idList); return clazz.isAssignableFrom(ids[0 ].getClass()) ? IteratorUtils.filter(IteratorUtils.map(idList, id -> elements.get(clazz.cast(id).id())).iterator(), Objects::nonNull) : IteratorUtils.filter(IteratorUtils.map(idList, id -> elements.get(idManager.convert(id))).iterator(), Objects::nonNull); } return TinkerHelper.inComputerMode(this ) ? (Iterator<T>) (clazz.equals(Vertex.class) ? IteratorUtils.filter((Iterator<Vertex>) iterator, t -> this .graphComputerView.legalVertex(t)) : IteratorUtils.filter((Iterator<Edge>) iterator, t -> this .graphComputerView.legalEdge(t.outVertex(), t))) :iterator; } public <E extends Element > void createIndex (final String key, final Class<E> elementClass) { if (Vertex.class.isAssignableFrom(elementClass)) { if (null == this .vertexIndex) this .vertexIndex = new TinkerIndex <>(this , TinkerVertex.class); this .vertexIndex.createKeyIndex(key); } else if (Edge.class.isAssignableFrom(elementClass)) { if (null == this .edgeIndex) this .edgeIndex = new TinkerIndex <>(this , TinkerEdge.class); this .edgeIndex.createKeyIndex(key); } else { throw new IllegalArgumentException ("Class is not indexable: " + elementClass); } } public <E extends Element > void dropIndex (final String key, final Class<E> elementClass) { if (Vertex.class.isAssignableFrom(elementClass)) { if (null != this .vertexIndex) this .vertexIndex.dropKeyIndex(key); } else if (Edge.class.isAssignableFrom(elementClass)) { if (null != this .edgeIndex) this .edgeIndex.dropKeyIndex(key); } else { throw new IllegalArgumentException ("Class is not indexable: " + elementClass); } } public <E extends Element > Set<String> getIndexedKeys (final Class<E> elementClass) { if (Vertex.class.isAssignableFrom(elementClass)) { return null == this .vertexIndex ? Collections.emptySet() : this .vertexIndex.getIndexedKeys(); } else if (Edge.class.isAssignableFrom(elementClass)) { return null == this .edgeIndex ? Collections.emptySet() : this .edgeIndex.getIndexedKeys(); } else { throw new IllegalArgumentException ("Class is not indexable: " + elementClass); } } public interface IdManager <T> { T getNextId (final TinkerGraph graph) ; T convert (final Object id) ; boolean allow (final Object id) ; } } }
0x03.几个问题 1.VertexProperty和Property的关系
VertexPropert类似于Property,只不过它专门表示Vertex关联的K-V对.但在某种意义上是不同的,因为它还可以表示一个可以拥有属性的Element的对象。
Property很像Java8的Optional类,因为属性不存在(即为空)。 Property的key始终是String,value是object。但每个图db实现的时候, 都会限制特定对象用作value
看了这个可能不太明白, 看看源码设计. Element 设计的是 Vertex,和Edege的基类.它们都继承了Element ,但是Property并没有继承Element,是定义一个Element可以拥有多个Property对象. 但是VertexProperty就比较特别,它继承了Property和Element. 所以如引用所说,VertexProperty可以获得Property… (当然,我并没有搞明白这个区别有什么意义?)
并且public enum Cardinality {SINGEL, LIST, SET} 也是默认定义在VertexProperty接口中的,只不过Janus实现的时候抽取为单独的类了.
2.SystemSchema和SystemVariables的关系
首先我们可以认同,由vertexLabel,EdgeLabel,PropertyKey 这种名字组成的一个系统图(表),类似mysql中的meta信息表是肯定存在的. 只不过实现上可能名字略有差别. (TinkerGraph里是SystemVariables类)
具体来说,只是单纯这个区别么? 这个有待进一步验证, 在JanusGraph中 ,大量存在SystemSchemaXX 的类, 代表系统的内置顶点和表, 以及系统信息, 但是由于继承关系比较复杂, 所以很难快速判断.