圖是由頂點集合(vertex)及頂點間的關係集合(邊edge)組成的一種網狀數據結構
應用場景
圖的術語
Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}
有環圖:包含一系列頂點連線的迴路(環路)
無環圖:DAG即爲有向無環圖
度:一個頂點所有邊的數量
出度:指從當前頂點指向其他頂點的邊的數量
入度:其他頂點指向當前頂點的邊的數量
圖的經典表示法:鄰接矩陣
1、對於每條邊,矩陣中相應單元格值爲1
2、對於每個回圈,矩陣中相應單元格值爲2,方便在行或列上求得頂點度數
GraphX是Spark提供分佈式圖計算API
GraphX特點
GraphX核心抽象
彈性分佈式屬性圖(Resilient Distributed Property Graph)
頂點和邊都帶屬性的有向多重圖
一份物理儲存,兩種檢視
對Graph檢視的所有操作,最終都會轉換成其關聯的Table檢視的RDD操作來完成
class Graph[VD, ED] {
val vertices: VertexRDD[VD]
val edges: EdgeRDD[ED]
val triplets: RDD[EdgeTriplet[VD, ED]]
}
import org.apache.spark.graphx._
val vertices:RDD[(VertexId,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
val edges=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
val graph=Graph(vertices,edges) //Graph[Int,Int] ?
import org.apache.spark.graphx.GraphLoader
//載入邊列表檔案建立圖,檔案每行描述一條邊,格式:srcId dstId。頂點與邊的屬性均爲1
val graph = GraphLoader.edgeListFile(sc,
"file:///opt/spark/data/graphx/followers.txt")
構建使用者合作關係屬性圖
val userGraph: Graph[(String, String), String]
構建使用者社羣網路關係
找出大於30歲的使用者
import org.apache.spark.graphx._
val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
val e1 = sc.makeRDD(Array(Edge(4L,1L,1L),Edge(2L,1L,7L),Edge(2L,4L,2L),Edge(5L,2L,2L),Edge(5L,3L,8L),Edge(5L,6L,3L),Edge(3L,6L,3L),Edge(3L,2L,4L)))
val graph1 =Graph(v1,e1)
//大於30歲的人
graph1.vertices.filter{case(id,(name,age))=>age>30}.collect
//打call超過5次
graph1.edges.filter{case(Edge(from,to,call))=>call>5}.collect
class Graph[VD, ED] {
val numEdges: Long
val numVertices: Long
val inDegrees: VertexRDD[Int]
val outDegrees: VertexRDD[Int]
val degrees: VertexRDD[Int]
}
屬性運算元:類似於RDD的map操作
class Graph[VD, ED] {
def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
val t1_graph = tweeter_graph.mapVertices { case(vertextId, (name, age)) => (vertextId, name) }
val t2_graph = tweeter_graph.mapVertices { (vertextId, attr) => (vertextId, attr._1) }
val t3_graph = tweeter_graph.mapEdges(e => Edge(e.srcId, e.dstId, e.attr*7.0))
結構運算元
class Graph[VD, ED] {
def reverse: Graph[VD, ED] //改變邊的方向
//生成滿足頂點與邊的條件的子圖
def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,
vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
}
val t1_graph = tweeter_graph.reverse
val t2_graph = tweeter_graph.subgraph(vpred=(id,attr)=>attr._2<65) //attr:(name,age)
Join運算元:從外部的RDDs載入數據,修改頂點屬性
class Graph[VD, ED] {
def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
//RDD中的頂點不匹配時,值爲None
def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
: Graph[VD2, ED]
}
val tweeters_comps:RDD[(VertexId,String)]= sc.parallelize(Array((1L, "kgc.cn"), (2L, "berkeley.edu"), (3L, "apache.org")))
val t_graph = tweeter_graph.joinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
t_graph.vertices.collect
val s_graph = tweeter_graph.outerJoinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
s_graph.vertices.collect
import org.apache.spark.graphx._
val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
val e1 = sc.makeRDD(Array(Edge(4L,1L,1),Edge(2L,1L,7),Edge(2L,4L,2),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3),Edge(3L,6L,3),Edge(3L,2L,4)))
val graph1 =Graph(v1,e1)
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
//修改頂點屬性
val initialUserGraph: Graph[User, Int] = tweeter_graph.mapVertices{
case (id, (name, age)) => User(name, age, 0, 0)
}
//將頂點入度、出度存入頂點屬性中
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
}.outerJoinVertices(initialUserGraph.outDegrees) {
case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
}
//頂點的入度即爲粉絲數量
for ((id, property) <- userGraph.vertices.collect)
println(s"User $id is ${property.name} and is liked by ${property.inDeg} people.")
User 1 is called Alice and is liked by 2 people.
User 2 is called Bob and is liked by 2 people.
User 3 is called Charlie and is liked by 1 people.
User 4 is called David and is liked by 1 people.
User 5 is called Ed and is liked by 0 people.
User 6 is called Fran and is liked by 2 people.
需求說明
數據:twitter-graph-data.txt
數據獲取地址:鏈接: https://pan.baidu.com/s/1OM5DFo3qO_HDDmCS2PIaWQ 提取碼: v8r3
格式:((User*, *),(User*,*))
(User*, *)=(使用者名稱,使用者ID)
第一個使用者表示被跟隨者(followee)
第二個使用者表示跟隨者(follower)
建立圖並計算每個使用者的粉絲數量
找出網路紅人
import org.apache.spark.graphx._
//正則表達式擷取數據
val pattern = """\(\((User[0-9]{1,},[0-9]{1,})\),\((User[0-9]{1,},[0-9]{1,})\)\)""".r
//載入數據
val t1 = sc.textFile("file:///data/twitter_graph_data.txt")
val t2 = t1.map(line=>line match{case pattern(followee,follower)=>(Some(followee),Some(follower));case _ => (None,None)})
val t3 = t2.filter(x=>x._1!=None && x._2!=None).map(x=>(x._1.get.split(","),x._2.get.split(","))).map(x=>(x._1(0),x._1(1).toLong,x._2(0),x._2(1).toLong))
//形成頂點,邊和圖
val verts = t3.flatMap(x => Array((x._2, x._1), (x._4, x._3))).distinct
val edges = t3.map(x=>Edge(x._2,x._4,"follow"))
val graph = Graph(verts,edges)
val defaultUser = ("")
val graph = Graph(verts, edges, defaultUser)
//找出網路紅人即粉絲數最大的人
graph.inDegrees.sortBy(_._2,false).take(1)
PageRank(PR)演算法
class Graph[VD, ED] {
//第一個參數爲收斂時允許的誤差,越小越精確, 確定迭代是否結束的參數
//第二個參數爲隨機重置概率
def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}
PageRank應用
//找出使用者社羣網路中最重要的使用者
val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters)
val followRelations = Array(Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4), Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3))
val edgeRDD = spark.sparkContext.parallelize(followRelations)
val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
val ranks = graph.pageRank(0.0001)
ranks.vertices.sortBy(_._2, false).collect
/*需求說明
現有followers.txt、users.txt,通過followers.txt建立圖,並使用PageRank演算法找出圖中最重要的使用者,輸出使用者名稱稱與重要程度
*/
//導包
import org.apache.spark.graphx.GraphLoader
//載入邊數據形成圖
val graph = GraphLoader.edgeListFile(sc, "file:///data/followers.txt")
//給定參數形成重要係數
val ranks = graph.pageRank(0.0001).vertices
//載入頂點數據
val users = sc.textFile("file:///data/user.txt").map { line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
//join操作形成名字和重要係數的rdd
val ranksByUsername = users.join(ranks).map {
case (id, (username, rank)) => (username, rank)
}
//找出重要係數最大的人
ranksByUsername.sortBy(_._2,false).take(1)
Pregel是Google提出的用於大規模分佈式圖計算框架
Pregel的計算由一系列迭代組成,稱爲supersteps
Pregel迭代過程
//原始碼
class Graph[VD, ED] {
def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
vprog: (VertexID, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED]
}
//參數說明
initialMsg:在「superstep 0」之前發送至頂點的初始訊息
maxIterations:將要執行的最大迭代次數
activeDirection:發送訊息方向(預設是出邊方向:EdgeDirection.Out)
vprog:使用者定義函數,用於頂點接收訊息
sendMsg:使用者定義的函數,用於確定下一個迭代發送的訊息及發往何處
mergeMsg:使用者定義的函數,在vprog前,合併到達頂點的多個訊息
GraphX Pregel 應用
//需求說明:求出圖中最小值
// 建立頂點集RDD
val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)), (3L, (2,-1)), (4L, (6,-1))))
// 建立邊集RDD
val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true), Edge(2L, 4L, true), Edge(3L, 1L, true), Edge(3L, 4L, true)))
// 建立圖
val graph = Graph(vertices, relationships)
//Pregel
val initialMsg = 9999//最大迭代次數
def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
if (message == initialMsg) value else (message min value._1, value._1)
}
def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {
val sourceVertex = triplet.srcAttr
if (sourceVertex._1 == sourceVertex._2) Iterator.empty else Iterator((triplet.dstId, sourceVertex._1))
}
def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2
val minGraph = graph.pregel(initialMsg, Int.MaxValue, EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
minGraph.vertices.collect.foreach{
case (vertexId, (value, original_value)) => println(value)
}
/*需求說明
求從0到任意點的最短路徑(SSSP)
實現思路
初始化 Vertex 的 Message 爲最大值
將源點(0)的 Message 設爲 0
每步每個節點將自己目前的 Message 加上邊的權值發送到相鄰節點,每個節點聚合出自身所有訊息的最小值
當某一步當中一個節點Message 值無變化,該節點停止迭代
*/
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>2.11.8</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-graphx_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
object pregelDemo03 extends App{
//求頂點0到各頂點最短距離
val sc = new SparkContext( new SparkConf().setAppName("test").setMaster("local[*]"))
val vertexRDD = sc.makeRDD(Array((0L,""),(1L,""),(2L,""),(3L,""),(4L,"")))
val edgeRDD = sc.makeRDD(Array(Edge(0L,4L,10),Edge(4L,3L,50),Edge(0L,1L,100),Edge(0L,2L,30),Edge(3L,1L,10),Edge(2L,1L,60),Edge(2L,3L,60)))
val graph =Graph(vertexRDD,edgeRDD)
//起始頂點
val srcVertextId = 0L
val initialGraph = graph.mapVertices{case (id,property)=>if (id==srcVertextId) 0.0 else Double.PositiveInfinity}
//呼叫pregel
val pregelGraph = initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)(
(vid:VertexId,vd:Double,distMsg:Double)=>{
val minDist = math.min(vd,distMsg)
println(s"頂點${vid},屬性${vd},收到訊息${distMsg},合併後的屬性${minDist}")
minDist
},
(edgeTriplet: EdgeTriplet[Double,PartitionID]) => {
if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
println(s"頂點${edgeTriplet.srcId} 給 頂點${edgeTriplet.dstId} 發送訊息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
} else {
Iterator.empty
}
},
(msg1: Double, msg2: Double) => math.min(msg1, msg2)
)
// pregelGraph.triplets.collect().foreach(println)
println(pregelGraph.vertices.collect.mkString("\n"))
sc.stop()
}