基於Spark GraphX 的圖形數據分析

2020-08-10 18:31:40

爲什麼需要圖計算

  • 許多大數據以大規模圖或網路的形式呈現
  • 許多非圖結構的大數據,常會被轉換爲圖模型進行分析
  • 圖數據結構很好地表達了數據之間的關聯性

一.圖(Graph)的基本概念

圖是由頂點集合(vertex)及頂點間的關係集合(邊edge)組成的一種網狀數據結構

  • 通常表示爲二元組:Gragh=(V,E)
  • 可以對事物之間的關係建模

應用場景

  • 在地圖應用中尋找最短路徑
  • 社羣網路關係
  • 網頁間超鏈接關係

圖的術語

  • 頂點(Vertex)
  • 邊(Edge)
Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}

在这里插入图片描述

  • 有向圖
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}

在这里插入图片描述

  • 無向圖
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}

在这里插入图片描述

  • 有環圖:包含一系列頂點連線的迴路(環路)
    在这里插入图片描述

  • 無環圖:DAG即爲有向無環圖
    在这里插入图片描述

度:一個頂點所有邊的數量

  • 出度:指從當前頂點指向其他頂點的邊的數量

  • 入度:其他頂點指向當前頂點的邊的數量
    在这里插入图片描述

  • 圖的經典表示法:鄰接矩陣

在这里插入图片描述

1、對於每條邊,矩陣中相應單元格值爲1
2、對於每個回圈,矩陣中相應單元格值爲2,方便在行或列上求得頂點度數

二.Spark GraphX 簡介

GraphX是Spark提供分佈式圖計算API
GraphX特點

  • 基於記憶體實現了數據的複用與快速讀取
  • 通過彈性分佈式屬性圖(Property Graph)統一了圖檢視與表檢視
  • 與Spark Streaming、Spark SQL和Spark MLlib等無縫銜接

GraphX核心抽象

  • 彈性分佈式屬性圖(Resilient Distributed Property Graph)

  • 頂點和邊都帶屬性的有向多重圖
    在这里插入图片描述
    在这里插入图片描述

  • 一份物理儲存,兩種檢視
    在这里插入图片描述

對Graph檢視的所有操作,最終都會轉換成其關聯的Table檢視的RDD操作來完成

三.GraphX API

  • Graph[VD,ED]
  • VertexRDD[VD]
  • EdgeRDD[ED]
  • EdgeTriplet[VD,ED]
  • Edge:樣例類
  • VertexId:Long的別名
class Graph[VD, ED] {
  val vertices: VertexRDD[VD]
  val edges: EdgeRDD[ED]
  val triplets: RDD[EdgeTriplet[VD, ED]]
}
import org.apache.spark.graphx._
val vertices:RDD[(VertexId,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))
val edges=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))
val graph=Graph(vertices,edges)  //Graph[Int,Int] ?
import org.apache.spark.graphx.GraphLoader
//載入邊列表檔案建立圖,檔案每行描述一條邊,格式:srcId dstId。頂點與邊的屬性均爲1
val graph = GraphLoader.edgeListFile(sc, 
                                        "file:///opt/spark/data/graphx/followers.txt")

1.屬性圖應用範例-1

在这里插入图片描述
構建使用者合作關係屬性圖

  • 頂點屬性:使用者名稱.職業
  • 邊屬性:合作關係
val userGraph: Graph[(String, String), String]

2.屬性圖應用範例-2

在这里插入图片描述
構建使用者社羣網路關係

  • 頂點:使用者名稱、年齡
  • 邊:打call次數

找出大於30歲的使用者

  • 假設打call超過5次,表示真愛。請找出他(她)們
    在这里插入图片描述
import org.apache.spark.graphx._
val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
val e1 = sc.makeRDD(Array(Edge(4L,1L,1L),Edge(2L,1L,7L),Edge(2L,4L,2L),Edge(5L,2L,2L),Edge(5L,3L,8L),Edge(5L,6L,3L),Edge(3L,6L,3L),Edge(3L,2L,4L)))
val graph1 =Graph(v1,e1)
//大於30歲的人
graph1.vertices.filter{case(id,(name,age))=>age>30}.collect
//打call超過5次
graph1.edges.filter{case(Edge(from,to,call))=>call>5}.collect

3.檢視圖資訊

  • 頂點數量
  • 邊數量
  • 度、入度、出度
class Graph[VD, ED] {
  val numEdges: Long
  val numVertices: Long
  val inDegrees: VertexRDD[Int]
  val outDegrees: VertexRDD[Int]
  val degrees: VertexRDD[Int]
}

4.圖的運算元

屬性運算元:類似於RDD的map操作

class Graph[VD, ED] {
  def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]
  def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]
  def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}
val t1_graph = tweeter_graph.mapVertices { case(vertextId, (name, age)) => (vertextId, name) }
val t2_graph = tweeter_graph.mapVertices { (vertextId, attr) => (vertextId, attr._1) }
val t3_graph = tweeter_graph.mapEdges(e => Edge(e.srcId, e.dstId, e.attr*7.0))

結構運算元

class Graph[VD, ED] {
  def reverse: Graph[VD, ED] //改變邊的方向
  //生成滿足頂點與邊的條件的子圖
  def subgraph(epred: EdgeTriplet[VD,ED] => Boolean, 
               vpred: (VertexId, VD) => Boolean): Graph[VD, ED]
  }
val t1_graph = tweeter_graph.reverse
val t2_graph = tweeter_graph.subgraph(vpred=(id,attr)=>attr._2<65)  //attr:(name,age)

Join運算元:從外部的RDDs載入數據,修改頂點屬性

class Graph[VD, ED] {
  def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]
  //RDD中的頂點不匹配時,值爲None
  def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2)
    : Graph[VD2, ED]
}
val tweeters_comps:RDD[(VertexId,String)]= sc.parallelize(Array((1L, "kgc.cn"), (2L, "berkeley.edu"), (3L, "apache.org")))
val t_graph = tweeter_graph.joinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
t_graph.vertices.collect

val s_graph = tweeter_graph.outerJoinVertices(tweeters_comps)((id, v, cmpy) => (v._1 + " @ " + cmpy, v._2))
s_graph.vertices.collect

5.GraphX API 應用

import org.apache.spark.graphx._
val v1 = sc.makeRDD(Array((1L,("Alice",28)),(2L,("Bob",27)),(3L,("Charlie",65)),(4L,("David",42)),(5L,("Ed",55)),(6L,("Fran",50))))
val e1 = sc.makeRDD(Array(Edge(4L,1L,1),Edge(2L,1L,7),Edge(2L,4L,2),Edge(5L,2L,2),Edge(5L,3L,8),Edge(5L,6L,3),Edge(3L,6L,3),Edge(3L,2L,4)))
val graph1 =Graph(v1,e1)
case class User(name: String, age: Int, inDeg: Int, outDeg: Int)
//修改頂點屬性
val initialUserGraph: Graph[User, Int] = tweeter_graph.mapVertices{ 
     case (id, (name, age)) => User(name, age, 0, 0) 
}
//將頂點入度、出度存入頂點屬性中 
val userGraph = initialUserGraph.outerJoinVertices(initialUserGraph.inDegrees) {
     case (id, u, inDegOpt) => User(u.name, u.age, inDegOpt.getOrElse(0), u.outDeg)
}.outerJoinVertices(initialUserGraph.outDegrees) {
    case (id, u, outDegOpt) => User(u.name, u.age, u.inDeg, outDegOpt.getOrElse(0))
}
//頂點的入度即爲粉絲數量
for ((id, property) <- userGraph.vertices.collect) 
   println(s"User $id is ${property.name} and is liked by ${property.inDeg} people.")
  • 結果如下
User 1 is called Alice and is liked by 2 people.
User 2 is called Bob and is liked by 2 people.
User 3 is called Charlie and is liked by 1 people.
User 4 is called David and is liked by 1 people.
User 5 is called Ed and is liked by 0 people.
User 6 is called Fran and is liked by 2 people.

6.練習一:誰是網路紅人?

需求說明
數據:twitter-graph-data.txt
數據獲取地址:鏈接: https://pan.baidu.com/s/1OM5DFo3qO_HDDmCS2PIaWQ 提取碼: v8r3
格式:((User*, *),(User*,*))
(User*, *)=(使用者名稱,使用者ID)
第一個使用者表示被跟隨者(followee)
第二個使用者表示跟隨者(follower)
建立圖並計算每個使用者的粉絲數量
找出網路紅人
import org.apache.spark.graphx._
//正則表達式擷取數據
val pattern = """\(\((User[0-9]{1,},[0-9]{1,})\),\((User[0-9]{1,},[0-9]{1,})\)\)""".r
//載入數據
val t1 = sc.textFile("file:///data/twitter_graph_data.txt")
val t2 =  t1.map(line=>line match{case pattern(followee,follower)=>(Some(followee),Some(follower));case _ => (None,None)})
val t3 = t2.filter(x=>x._1!=None && x._2!=None).map(x=>(x._1.get.split(","),x._2.get.split(","))).map(x=>(x._1(0),x._1(1).toLong,x._2(0),x._2(1).toLong))
//形成頂點,邊和圖
val verts = t3.flatMap(x => Array((x._2, x._1), (x._4, x._3))).distinct
val edges = t3.map(x=>Edge(x._2,x._4,"follow"))
val graph = Graph(verts,edges)
val defaultUser = ("")
val graph = Graph(verts, edges, defaultUser)
//找出網路紅人即粉絲數最大的人
graph.inDegrees.sortBy(_._2,false).take(1)

7.PageRank in GraphX

PageRank(PR)演算法

  • 用於評估網頁鏈接的品質和數量,以確定該網頁的重要性和權威性的相對分數,範圍爲0到10
  • 從本質上講,PageRank是找出圖中頂點(網頁鏈接)的重要性
  • GraphX提供了PageRank API用於計算圖的PageRank
class Graph[VD, ED] {
//第一個參數爲收斂時允許的誤差,越小越精確, 確定迭代是否結束的參數
//第二個參數爲隨機重置概率
  def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}

PageRank應用

//找出使用者社羣網路中最重要的使用者
val tweeters = Array((1L, ("Alice", 28)), (2L, ("Bob", 27)), (3L, ("Charlie", 65)), (4L, ("David", 42)), (5L, ("Ed", 55)), (6L, ("Fran", 50)))
val vertexRDD: RDD[(Long, (String, Int))] = spark.sparkContext.parallelize(tweeters)

val followRelations = Array(Edge[Int](2L, 1L, 7), Edge[Int](2L, 4L, 2), Edge[Int](3L, 2L, 4),  Edge[Int](3L, 6L, 3), Edge[Int](4L, 1L, 1), Edge[Int](5L, 2L, 2), Edge[Int](5L, 3L, 8), Edge[Int](5L, 6L, 3))
val edgeRDD = spark.sparkContext.parallelize(followRelations)

val graph: Graph[(String, Int), Int] = Graph(vertexRDD, edgeRDD)
val ranks = graph.pageRank(0.0001)
ranks.vertices.sortBy(_._2, false).collect

8.練習2:PageRank應用

/*需求說明
現有followers.txt、users.txt,通過followers.txt建立圖,並使用PageRank演算法找出圖中最重要的使用者,輸出使用者名稱稱與重要程度
*/
//導包
import org.apache.spark.graphx.GraphLoader
//載入邊數據形成圖
val graph = GraphLoader.edgeListFile(sc, "file:///data/followers.txt")
//給定參數形成重要係數
val ranks = graph.pageRank(0.0001).vertices
//載入頂點數據
val users = sc.textFile("file:///data/user.txt").map { line =>
  val fields = line.split(",")
  (fields(0).toLong, fields(1))
}
//join操作形成名字和重要係數的rdd
val ranksByUsername = users.join(ranks).map {
  case (id, (username, rank)) => (username, rank)
}
//找出重要係數最大的人
ranksByUsername.sortBy(_._2,false).take(1)

9.Pregel概述

Pregel是Google提出的用於大規模分佈式圖計算框架

  • 圖遍歷(BFS)
  • 單源最短路徑(SSSP)
  • PageRank計算

Pregel的計算由一系列迭代組成,稱爲supersteps
Pregel迭代過程

  • 每個頂點從上一個superstep接收入站訊息
  • 計算頂點新的屬性值
  • 在下一個superstep中向相鄰的頂點發送訊息
  • 當沒有剩餘訊息時,迭代結束
//原始碼
class Graph[VD, ED] {  
    def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(
      vprog: (VertexID, VD, A) => VD,
      sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],
      mergeMsg: (A, A) => A)
    : Graph[VD, ED]
}
//參數說明
initialMsg:在「superstep 0」之前發送至頂點的初始訊息
maxIterations:將要執行的最大迭代次數
activeDirection:發送訊息方向(預設是出邊方向:EdgeDirection.Out)
vprog:使用者定義函數,用於頂點接收訊息
sendMsg:使用者定義的函數,用於確定下一個迭代發送的訊息及發往何處
mergeMsg:使用者定義的函數,在vprog前,合併到達頂點的多個訊息

GraphX Pregel 應用
在这里插入图片描述

//需求說明:求出圖中最小值 
// 建立頂點集RDD
val vertices: RDD[(VertexId, (Int, Int))] = sc.parallelize(Array((1L, (7,-1)), (2L, (3,-1)),  (3L, (2,-1)), (4L, (6,-1))))
// 建立邊集RDD
val relationships: RDD[Edge[Boolean]] = sc.parallelize(Array(Edge(1L, 2L, true), Edge(1L, 4L, true),  Edge(2L, 4L, true), Edge(3L, 1L, true),  Edge(3L, 4L, true)))
// 建立圖
val graph = Graph(vertices, relationships)
//Pregel
val initialMsg = 9999//最大迭代次數
def vprog(vertexId: VertexId, value: (Int, Int), message: Int): (Int, Int) = {
  if (message == initialMsg)  value else (message min value._1, value._1)
}

def sendMsg(triplet: EdgeTriplet[(Int, Int), Boolean]): Iterator[(VertexId, Int)] = {
  val sourceVertex = triplet.srcAttr
  if (sourceVertex._1 == sourceVertex._2) Iterator.empty  else  Iterator((triplet.dstId, sourceVertex._1))
}

def mergeMsg(msg1: Int, msg2: Int): Int = msg1 min msg2
val minGraph = graph.pregel(initialMsg, Int.MaxValue,  EdgeDirection.Out)(vprog, sendMsg, mergeMsg)
minGraph.vertices.collect.foreach{
  case (vertexId, (value, original_value)) => println(value)
}

10.練習3:使用Pregel計算單源最短路徑

在这里插入图片描述

/*需求說明
求從0到任意點的最短路徑(SSSP)
實現思路
初始化 Vertex 的 Message 爲最大值
將源點(0)的 Message 設爲 0
每步每個節點將自己目前的 Message 加上邊的權值發送到相鄰節點,每個節點聚合出自身所有訊息的最小值
當某一步當中一個節點Message 值無變化,該節點停止迭代
*/
  • maven依賴 如下
    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>2.11.8</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-graphx_2.11</artifactId>
            <version>2.2.0</version>
        </dependency>
    </dependencies>
object pregelDemo03 extends App{
  //求頂點0到各頂點最短距離
  val sc = new SparkContext( new SparkConf().setAppName("test").setMaster("local[*]"))
  val vertexRDD = sc.makeRDD(Array((0L,""),(1L,""),(2L,""),(3L,""),(4L,"")))
  val edgeRDD = sc.makeRDD(Array(Edge(0L,4L,10),Edge(4L,3L,50),Edge(0L,1L,100),Edge(0L,2L,30),Edge(3L,1L,10),Edge(2L,1L,60),Edge(2L,3L,60)))
  val graph =Graph(vertexRDD,edgeRDD)
  //起始頂點
  val srcVertextId = 0L
  val initialGraph = graph.mapVertices{case (id,property)=>if (id==srcVertextId) 0.0 else Double.PositiveInfinity}
  //呼叫pregel
  val pregelGraph = initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)(
    (vid:VertexId,vd:Double,distMsg:Double)=>{
      val minDist = math.min(vd,distMsg)
      println(s"頂點${vid},屬性${vd},收到訊息${distMsg},合併後的屬性${minDist}")
      minDist
    },
    (edgeTriplet: EdgeTriplet[Double,PartitionID]) => {
      if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {
        println(s"頂點${edgeTriplet.srcId} 給 頂點${edgeTriplet.dstId} 發送訊息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")
        Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))
      } else {
        Iterator.empty
      }
    },
    (msg1: Double, msg2: Double) => math.min(msg1, msg2)
  )
//  pregelGraph.triplets.collect().foreach(println)
    println(pregelGraph.vertices.collect.mkString("\n"))
    sc.stop()
}
  • 執行結果如下
    在这里插入图片描述