GraphX中基于Pregel的单源最短路径详解

GraphX利用Spark这样一个并行处理框架实现了类似Pregel的图计算模型,该计算模型由以下三个主要部分:

  • 节点消息处理的函数
    vprog:(VertexId, VD, A) => VD
    (节点Id, 节点属性, 消息) => 节点属性
  • 节点发送消息的函数
    sendMsg:EdgeTriplet[VD, ED] => Iterator[(VertexId, A)]
    (边元组) => Iterator[(目标节点Id, 消息)]

  • 消息合并函数(功能类似于Hadoop中的combiner)
    mergeMsg:(A, A) => A
    (消息, 消息) => 消息

Pregel算法已经被集中抽象到GraphOps这个类中,下面就通过详解单源最短路径的例子来看看如何使用。

Spark API版本: 1.1.0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.apache.spark.graphx._
import org.apache.spark.util.GraphGenerators
// 初始化一个随机图,节点的度符合对数正态分布,边属性初始化为1
val graph: Graph[Double, Int] =
GraphGenerators.logNormalGraph(sc, numVertices = 100, numEParts = 10).mapVertices((id, _) => id.toDouble)
// 设置目标节点
val sourceId: VertexId = 18
// 初始化各节点到原点的距离
val g = graph.mapVertices( (id,_) => if(id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = g.pregel(Double.PositiveInfinity)(
// Vertex Program,节点消息处理函数,dist为源节点属性(Double),newDist为消息
(id, dist, newDist) => math.min(dist, newDist),
// Send Message,发送消息函数,如果src节点属性+边属性(1) < dst节点属性,发送更新消息 (dst节点Id, 消息(即最短距离))
triplet => {
if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
// Merge Message,对消息进行合并的操作,类似Hadoop中的combiner,对一轮迭代中要发送给目标节点的消息进行合并处理
(a,b) => math.min(a,b)
)
println(sssp.vertices.collect.mkString("\n"))