spark graphx 教程 02(基本代码)

现在我们从代码着手来快速体验一下spark graphx,首先graph类在spark中的路径是org.apache.spark.graphx,我们进去看看源代码

graph基本属性

我们在spark的graph类,看到了很多方法,但是属性只看到了如下3个基本属性:

/**
   * An RDD containing the vertices and their associated attributes.
   *
   * @note vertex ids are unique.
   * @return an RDD containing the vertices in this graph
   */
  val vertices: VertexRDD[VD]

  /**
   * An RDD containing the edges and their associated attributes.  The entries in the RDD contain
   * just the source id and target id along with the edge data.
   *
   * @return an RDD containing the edges in this graph
   *
   * @see `Edge` for the edge type.
   * @see `Graph#triplets` to get an RDD which contains all the edges
   * along with their vertex data.
   *
   */
  val edges: EdgeRDD[ED]

  /**
   * An RDD containing the edge triplets, which are edges along with the vertex data associated with
   * the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e.
   * if only the edge data and adjacent vertex ids are needed.
   *
   * @return an RDD containing edge triplets
   *
   * @example This operation might be used to evaluate a graph
   * coloring where we would like to check that both vertices are a
   * different color.
   * {{{
   * type Color = Int
   * val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv")
   * val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
   * }}}
   */
  val triplets: RDD[EdgeTriplet[VD, ED]]

  /**
   * Caches the vertices and edges associated with this graph at the specified storage level,
   * ignoring any target storage levels previously set.
   *
   * @param newLevel the level at which to cache the graph.
   *
   * @return A reference to this graph for convenience.
   */

分别是vertexRdd,EdgeRdd,triplets,其中:

  • vertexRdd就是保存所有顶点的rdd,顶点包含了顶点的各个属性以及顶点自身的唯一标识id
  • EdgeRdd就是保存所有边的rdd,边包含了边的属性,以及边来源顶点的id和目标顶点的id
  • triplets就是保存所有顶点和边的信息,既包含了顶点的各个属性,又包含了边的属性和边来源顶点的id和目标顶点的id

创建graph

接下来我们来创建一个graph,首先我们描述一下基本的场景

首先是顶点以及顶点的属性(这里我们把人看做顶点,人的名字和职位看做顶点的属性)

rxin 是一个学生
jgonzal 是一个博士后
franklin 是一个教授
istoica 是一个教授

然后是边以及边的属性(这里我们把边看做人与人之间的关系)

rxin是jgonzal的合作者
franklin是rxin的顾问
istoica是franklin的同事
franklin是jgonzal的pi(可能可以翻译成科研领头人)

可能图表的形式会更直观,图表如下
spark graphx 教程 02(基本代码)

那这些人以及关系可以抽象成一个图,可以通过如下代码抽象到spark的graph中

val sc: SparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array(
                        (3L, ("rxin", "student")), 
                        (7L, ("jgonzal", "postdoc")),
                        (5L, ("franklin", "prof")), 
                        (2L, ("istoica", "prof"))
            ))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
                    Edge(3L, 7L, "collab"),    
                    Edge(5L, 3L, "advisor"),
                    Edge(2L, 5L, "colleague"), 
                    Edge(5L, 7L, "pi")
                ))
val defaultUser = ("John Doe", "Missing")

val graph = Graph(users, relationships, defaultUser)


这里就按照上面说的人和人的关系构建了一个图,这里创建图是用val graph = Graph(users, relationships, defaultUser),查看spark创建图的源代码如下

/**
   * Construct a graph from a collection of vertices and
   * edges with attributes.  Duplicate vertices are picked arbitrarily and
   * vertices found in the edge collection but not in the input
   * vertices are assigned the default attribute.
   *
   * @tparam VD the vertex attribute type
   * @tparam ED the edge attribute type
   * @param vertices the "set" of vertices and their attributes
   * @param edges the collection of edges in the graph
   * @param defaultVertexAttr the default vertex attribute to use for vertices that are
   *                          mentioned in edges but not in vertices
   * @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
   * @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
   */
  def apply[VD: ClassTag, ED: ClassTag](
      vertices: RDD[(VertexId, VD)],
      edges: RDD[Edge[ED]],
      defaultVertexAttr: VD = null.asInstanceOf[VD],
      edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
      vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
    GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
  }

我们这里传过去的有3个参数

  • users就是所有顶点的rdd(RDD[(VertexId, VD)])

  • relationships就是所有边的集合RDD[Edge[ED]]

  • defaultUser是默认的顶点,也就是说如果relationships里面的源id或者目标id在users里面找不到,就会把找不到的id的那个人当成是defaultUser