本文共 17008 字,大约阅读时间需要 56 分钟。
是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构
在地图应用中寻找最短路径
社交网络关系
网页间超链接关系
Graph[VD,ED]
class Graph[VD, ED] { val vertices: VertexRDD[VD] val edges: EdgeRDD[ED] val triplets: RDD[EdgeTriplet[VD, ED]]}
VertexRDD[VD]
EdgeRDD[ED] EdgeTriplet[VD,ED]import org.apache.spark.graphx._val vertices:RDD[(VertexId,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))val edges=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))val graph=Graph(vertices,edges) //Graph[Int,Int] ?
Edge:样例类
VertexId:Long的别名import org.apache.spark.graphx.GraphLoader//加载边列表文件创建图,文件每行描述一条边,格式:srcId dstId。顶点与边的属性均为1val graph = GraphLoader.edgeListFile(sc, "file:///opt/spark/data/graphx/followers.txt")
Graphx借鉴PowerGraph,使用的是Vertex-Cut( 点分割 ) 方式存储图,用三个RDD存储图数据信息:
VertexTable(id, data):id为顶点id, data为顶点属性
EdgeTable(pid, src, dst, data):pid 为分区id ,src为源顶点id ,dst为目的顶点id,data为边属性
RoutingTable(id, pid):id 为顶点id ,pid 为分区id
GraphX中vertices、edges以及triplets
在GraphX中,vertices对应着名称为VertexRDD的RDD。这个RDD有顶点id和顶点属性两个成员变量。它的源码如下所示:
abstract class VertexRDD[VD]( sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[(VertexId, VD)](sc, deps)
从源码中我们可以看到,VertexRDD继承自RDD[(VertexId, VD)],这里VertexId表示顶点id,VD表示顶点所带的属性的类别。这从另一个角度也说明VertexRDD拥有顶点id和顶点属性。
在GraphX中,edges对应着EdgeRDD。这个RDD拥有三个成员变量,分别是源顶点id、目标顶点id以及边属性。它的源码如下所示:
abstract class EdgeRDD[ED]( sc: SparkContext, deps: Seq[Dependency[_]]) extends RDD[Edge[ED]](sc, deps)
从源码中我们可以看到,EdgeRDD继承自RDD[Edge[ED]],即类型为Edge[ED]的RDD。
在GraphX中,triplets对应着EdgeTriplet。它是一个三元组视图,这个视图逻辑上将顶点和边的属性保存为一个RDD[EdgeTriplet[VD, ED]]。可以通过下面的Sql表达式表示这个三元视图的含义:
SELECT src.id, dst.id, src.attr, e.attr, dst.attrFROM edges AS e LEFT JOIN vertices AS src, vertices AS dstON e.srcId = src.Id AND e.dstId = dst.Id
同样,也可以通过下面图解的形式来表示它的含义:
EdgeTriplet的源代码如下所示:
class EdgeTriplet[VD, ED] extends Edge[ED] { //源顶点属性 var srcAttr: VD = _ // nullValue[VD] //目标顶点属性 var dstAttr: VD = _ // nullValue[VD] protected[spark] def set(other: Edge[ED]): EdgeTriplet[VD, ED] = { srcId = other.srcId dstId = other.dstId attr = other.attr this }
EdgeTriplet类继承自Edge类,我们来看看这个父类:
case class Edge[@specialized(Char, Int, Boolean, Byte, Long, Float, Double) ED] ( var srcId: VertexId = 0, var dstId: VertexId = 0, var attr: ED = null.asInstanceOf[ED]) extends Serializable
Edge类中包含源顶点id,目标顶点id以及边的属性。所以从源代码中我们可以知道,triplets既包含了边属性也包含了源顶点的id和属性、目标顶点的id和属性。
打印图的顶点和顶点的值,打印图的边,打印triplets带有属性的点和边
//属性图案例object SparkGraph2 { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val users = sc.parallelize(Array( (3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "professor")), (2L, ("istoica", "professor")) )) val relationships = sc.parallelize(Array( (Edge(3L, 7L, "collaborator")), (Edge(5L, 3L, "advisor")), (Edge(2L, 5L, "colleague")), (Edge(5L, 7L, "PI")) )) //通过点集合和边集合构建图 val graph = Graph(users, relationships) //打印图的顶点和顶点的值 println("打印图的顶点和顶点的值") graph.vertices.foreach(x => println(s"${x._1}-->${x._2}")) //打印图的边 println("打印图的边") graph.edges.foreach(x => println(s"src:${x.srcId},dst:${x.dstId},attr:${x.attr}")) //triplets带有属性的点和边 println("triplets带有属性的点和边") graph.triplets.foreach(x => println(x.toString())) }}
object SparkGraph3 { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val users = sc.parallelize(Array( (1L, ("alice", 28)), (2L, ("bob", 27)), (3L, ("charlie", 65)), (4L, ("david", 42)), (5L, ("ed", 55)), (6L, ("fran", 50)) )) val cntCall = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) val graph = Graph(users,cntCall) //找出大于30岁的用户 println("找出大于30岁的用户1") graph.vertices.filter{ case (id,(name,age))=>age>30 }.foreach(x=> println(x.toString())) println("找出大于30岁的用户2") graph.vertices.filter(_._2._2>30).foreach(x=> println(x.toString())) //假设打call超过5次,表示真爱。请找出他(她)们 println("假设打call超过5次,表示真爱。请找出他(她)们") graph.triplets.filter(_.attr>5).foreach(x=> println(s"${x.toString()}")) // 查看图信息 // 顶点数量 // 边数量 // 度、入度、出度 println("查看图形边的数量") println(graph.numVertices) println("查看图形边的数量") println(graph.numEdges) println("查看图形的入度") graph.inDegrees.foreach(println) println("查看图形的出度") graph.outDegrees.foreach(println) println("查看图形的度") graph.degrees.foreach(println) }}
object SparkGraph4 { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val users = sc.parallelize(Array( (1L, ("alice", 28)), (2L, ("bob", 27)), (3L, ("charlie", 65)), (4L, ("david", 42)), (5L, ("ed", 55)), (6L, ("fran", 50)) )) val cntCall = sc.parallelize(Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) )) val graph = Graph(users, cntCall) //对顶点遍历,生成新的图 val graph1 = graph.mapVertices((vertexId,attr)=>(vertexId,attr._1)) //对边进行遍历,生成新的图 val graph2 = graph.mapEdges(e=>e.attr*7.0) //打印 println("graph") graph.triplets.foreach(x=> println(x.toString())) println("graph1") graph1.triplets.foreach(x=> println(x.toString())) println("graph2") graph2.triplets.foreach(x=> println(x.toString())) //对图里边的方向进行反转 println("graph") graph.triplets.foreach(x=> println(x.toString())) println("对图里边的方向进行反转") val reverseGraph = graph.reverse reverseGraph.triplets.foreach(x=> println(x.toString())) println("subgraph") val subgraph = graph.subgraph(vpred = (id,attr)=>attr._2<65) subgraph.triplets.foreach(x=> println(x.toString())) println("打印图的顶点和顶点的值") subgraph.vertices.foreach(x => println(s"${x._1}-->${x._2}")) println("打印图的边") subgraph.edges.foreach(x => println(s"src:${x.srcId},dst:${x.dstId},attr:${x.attr}")) //join val tweeters_comps = sc.parallelize(Array((1L, "kgc.cn"), (2L, "berkeley.edu"), (3L, "apache.org"))) val t_graph = graph.joinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2)) t_graph.vertices.collect.foreach(println(_)) }}
object FansGraph { def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val vertexArray = Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)) ) val edgeArray = Array( Edge(2L, 1L, 7), Edge(2L, 4L, 2), Edge(3L, 2L, 4), Edge(3L, 6L, 3), Edge(4L, 1L, 1), Edge(5L, 2L, 2), Edge(5L, 3L, 8), Edge(5L, 6L, 3) ) val vertexRDD: RDD[(Long, (String, Int))] = sc.parallelize(vertexArray) val edgeRDD: RDD[Edge[Int]] = sc.parallelize(edgeArray) val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) case class User(name: String, age: Int, inDeg: Int, outDeg: Int) //修改顶点属性 val initialUserGraph: Graph[User, Int] = graph.mapVertices{ case (id, (name, age)) => User(name, age, 0, 0) } //将顶点入度、出度存入顶点属性中 val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) { case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg) }.outerJoinVertices(initialUserGraph.outDegrees) { case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0)) } //顶点的入度即为粉丝数量 for ((id, property) <- userGraph.vertices.collect) println(s"User $id is ${property.name} and is liked by ${property.inDeg} people.") }}
object SocialNet{ def main(args: Array[String]): Unit = { //创建SparkSession val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50))) val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters) val followRelations = Array(Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4), Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3)) val edgeRDD = spark.sparkContext.parallelize(followRelations) val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD) val ranks = graph.pageRank(0.0001) ranks.vertices.sortBy(_._2, false).collect.foreach(println) }}
object NetRedPeople{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val regex = """\(\((User[0-9]{1,},[0-9]{1,})\),\((User[0-9]{1,},[0-9]{1,})\)\)""".r val twitters = sc.textFile("files/07twitter_graph/twitter_graph_data.txt") .map(line => line match { case regex(followee, follower) => (Some(followee), Some(follower)) case _ => (None, None) }).filter(x => x._1 != None && x._2 != None) .map(x => (x._1.get.split(","), x._2.get.split(","))) .map(x => (x._1(0), x._1(1).toLong, x._2(0), x._2(1).toLong)) val verts = twitters.flatMap(x=>Array((x._2,x._1),(x._4,x._3))).distinct() val edges = twitters.map(x=>Edge(x._2,x._4,"follow")) val defaultUser = ("") val gragh = Graph(verts,edges,defaultUser) gragh.degrees.sortBy(lines=>lines._2,false).take(3).foreach(println(_)) }}
PageRank(PR)算法,用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
从本质上讲,PageRank是找出图中顶点(网页链接)的重要性,GraphX提供了PageRank API用于计算图的PageRank//练习2:PageRank应用object PageRanktest{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext // Load the edges as a graph val graph = GraphLoader.edgeListFile(sc,"hdfs://192.168.221.140:9000/kb10/graphx/followers.txt") // Run PageRank val ranks = graph.pageRank(0.0001).vertices // Join the ranks with the usernames val users = sc.textFile("hdfs://192.168.221.140:9000/kb10/graphx/users.txt").map { line => { val fields = line.split(",") (fields(0).toLong, fields (1)) } } val ranksByUsername = users.join(ranks).map { case (id, (username, rank)) => (username, rank) } ranksByUsername.sortBy(line=>line._2,false) .collect.foreach(println(_)) }}
//演示示例10:计算连通分量object ConnectedComponent{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val vertexArray = Array( (1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (8L, ("Fran", 50)), (9L, ("aaa", 50)) ) val edgeArray = Array( Edge(1L, 2L, 7), Edge(2L, 3L, 2), Edge(3L, 4L, 4), Edge(4L, 5L, 3), Edge(8L, 9L, 1)// Edge(7L, 1L, 2),// Edge(5L, 7L, 8),// Edge(5L, 6L, 3) ) val vertexRDD = sc.parallelize(vertexArray) val edgeRDD = sc.parallelize(edgeArray) val graph = Graph(vertexRDD,edgeRDD) graph.triplets.collect.foreach(println) // 连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接, // 其顶点是原始图顶点集的子集,其边是原始图边集的子集 // 只要求出图的所有连通分量,就可以知道图中任意两顶点之间是否有路径可达 println("---连通分量") graph.connectedComponents.triplets.collect.foreach(println) }}
def PageRank(v: Id, msgs: List[Double]) { // 计算消息和var msgSum = 0for (m <- msgs) { msgSum = msgSum + m }// 更新 PageRank (PR)A(v).PR = 0.15 + 0.85 * msgSum// 广播新的PR消息for (j <- OutNbrs(v)) { msg = A(v).PR / A(v).NumLinkssend_msg(to=j, msg)}// 检查终止if (converged(A(v).PR)) voteToHalt(v)}
Pregel选择了一种纯消息传递的模式,忽略远程数据读取和其他共享内存的方式,这样做有两个原因。
第一,消息的传递有足够高效的表达能力,不需要远程读取(remote reads)。
第二,性能的考虑。在一个集群环境中,从远程机器上读取一个值是会有很高的延迟的,这种情况很难避免。而消息传递模式通过异步和批量的方式传递消息,可以缓解这种远程读取的延迟。
GraphX也是基于BSP模式。GraphX公开了一个类似Pregel的操作,它是广泛使用的Pregel和GraphLab抽象的一个融合。在GraphX中,Pregel操作者执行一系列的超步,在这些超步中,顶点从之前的超步中接收进入(inbound)消息,为顶点属性计算一个新的值,然后在以后的超步中发送消息到邻居顶点。
不像Pregel而更像GraphLab,消息通过边triplet的一个函数被并行计算,消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel操作停止迭代并返回最终的图。
object Pregeltest{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder().master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val initialMsg = 9999 def vprog(vertexId: VertexId,value:(Int,Int),message:Int):(Int,Int)={ if (message==initialMsg) { value }else{ (message min value._1,value._1) } } def sendMsg(triplet:EdgeTriplet[(Int,Int),Boolean]):Iterator[(VertexId,Int)]={ //srcAttr:源顶点属性 //dstAttr:目标顶点属性 val sourceVertex = triplet.srcAttr if (sourceVertex._1==sourceVertex._2) Iterator.empty else Iterator((triplet.dstId,sourceVertex._1)) } def mergeMsg(msg1:Int,msg2:Int):Int=msg1 min msg2 //创建顶点集RDD val vertices = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)), (3L, (2,-1)), (4L, (6,-1)))) //创建边集RDD val relationships = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true))) //创建图 val graph = Graph(vertices,relationships) //Pregel val minGraph = graph.pregel(initialMsg,Int.MaxValue,EdgeDirection.Out)(vprog,sendMsg,mergeMsg) minGraph.vertices.collect.foreach{ case (vertexId,(value,original_value)) => println(value) } }}
object Pregeltest2{ def main(args: Array[String]): Unit = { val spark = SparkSession.builder.master("local[*]").appName(this.getClass.getSimpleName).getOrCreate() val sc = spark.sparkContext val vertices:RDD[(VertexId,Double)]=sc.makeRDD(Seq((0L,1.0),(1L,1.0),(2L,1.0),(3L,1.0))) val edges=sc.makeRDD(Seq(Edge(0L,1L,100),Edge(0L,2L,30),Edge(0L,4L,10),Edge(2L,1L,60),Edge(2L,3L,60),Edge(3L,1L,10),Edge(4L,3L,50))) val graph=Graph(vertices,edges) val sourceId: VertexId = 0L val initGraph=graph.mapVertices((id, _) => if (id == sourceId) 0 else Double.PositiveInfinity) val sssp=initGraph.pregel(Double.PositiveInfinity)( //接收数据处理函数 (id,dist,newDist)=>math.min(dist,newDist), triplet=>{ //判断是否继续发送下一个顶点 if(triplet.srcAttr+triplet.attrmath.min(dist1,dist2) //合并消息 ) println(sssp.vertices.collect().mkString("\n")) }}
转载地址:http://pwqb.baihongyu.com/