spark graphx 教程 02(基本代码)
现在我们从代码着手来快速体验一下spark graphx,首先graph类在spark中的路径是org.apache.spark.graphx,我们进去看看源代码
graph基本属性
我们在spark的graph类,看到了很多方法,但是属性只看到了如下3个基本属性:
/**
* An RDD containing the vertices and their associated attributes.
*
* @note vertex ids are unique.
* @return an RDD containing the vertices in this graph
*/
val vertices: VertexRDD[VD]
/**
* An RDD containing the edges and their associated attributes. The entries in the RDD contain
* just the source id and target id along with the edge data.
*
* @return an RDD containing the edges in this graph
*
* @see `Edge` for the edge type.
* @see `Graph#triplets` to get an RDD which contains all the edges
* along with their vertex data.
*
*/
val edges: EdgeRDD[ED]
/**
* An RDD containing the edge triplets, which are edges along with the vertex data associated with
* the adjacent vertices. The caller should use [[edges]] if the vertex data are not needed, i.e.
* if only the edge data and adjacent vertex ids are needed.
*
* @return an RDD containing edge triplets
*
* @example This operation might be used to evaluate a graph
* coloring where we would like to check that both vertices are a
* different color.
* {{{
* type Color = Int
* val graph: Graph[Color, Int] = GraphLoader.edgeListFile("hdfs://file.tsv")
* val numInvalid = graph.triplets.map(e => if (e.src.data == e.dst.data) 1 else 0).sum
* }}}
*/
val triplets: RDD[EdgeTriplet[VD, ED]]
/**
* Caches the vertices and edges associated with this graph at the specified storage level,
* ignoring any target storage levels previously set.
*
* @param newLevel the level at which to cache the graph.
*
* @return A reference to this graph for convenience.
*/
分别是vertexRdd,EdgeRdd,triplets,其中:
- vertexRdd就是保存所有顶点的rdd,顶点包含了顶点的各个属性以及顶点自身的唯一标识id
- EdgeRdd就是保存所有边的rdd,边包含了边的属性,以及边来源顶点的id和目标顶点的id
- triplets就是保存所有顶点和边的信息,既包含了顶点的各个属性,又包含了边的属性和边来源顶点的id和目标顶点的id
创建graph
接下来我们来创建一个graph,首先我们描述一下基本的场景
首先是顶点以及顶点的属性(这里我们把人看做顶点,人的名字和职位看做顶点的属性)
rxin 是一个学生
jgonzal 是一个博士后
franklin 是一个教授
istoica 是一个教授
然后是边以及边的属性(这里我们把边看做人与人之间的关系)
rxin是jgonzal的合作者
franklin是rxin的顾问
istoica是franklin的同事
franklin是jgonzal的pi(可能可以翻译成科研领头人)
可能图表的形式会更直观,图表如下
那这些人以及关系可以抽象成一个图,可以通过如下代码抽象到spark的graph中
val sc: SparkContext
val users: RDD[(VertexId, (String, String))] =
sc.parallelize(Array(
(3L, ("rxin", "student")),
(7L, ("jgonzal", "postdoc")),
(5L, ("franklin", "prof")),
(2L, ("istoica", "prof"))
))
val relationships: RDD[Edge[String]] = sc.parallelize(Array(
Edge(3L, 7L, "collab"),
Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"),
Edge(5L, 7L, "pi")
))
val defaultUser = ("John Doe", "Missing")
val graph = Graph(users, relationships, defaultUser)
这里就按照上面说的人和人的关系构建了一个图,这里创建图是用val graph = Graph(users, relationships, defaultUser),查看spark创建图的源代码如下
/**
* Construct a graph from a collection of vertices and
* edges with attributes. Duplicate vertices are picked arbitrarily and
* vertices found in the edge collection but not in the input
* vertices are assigned the default attribute.
*
* @tparam VD the vertex attribute type
* @tparam ED the edge attribute type
* @param vertices the "set" of vertices and their attributes
* @param edges the collection of edges in the graph
* @param defaultVertexAttr the default vertex attribute to use for vertices that are
* mentioned in edges but not in vertices
* @param edgeStorageLevel the desired storage level at which to cache the edges if necessary
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*/
def apply[VD: ClassTag, ED: ClassTag](
vertices: RDD[(VertexId, VD)],
edges: RDD[Edge[ED]],
defaultVertexAttr: VD = null.asInstanceOf[VD],
edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY): Graph[VD, ED] = {
GraphImpl(vertices, edges, defaultVertexAttr, edgeStorageLevel, vertexStorageLevel)
}
我们这里传过去的有3个参数
-
users就是所有顶点的rdd(RDD[(VertexId, VD)])
-
relationships就是所有边的集合RDD[Edge[ED]]
-
defaultUser是默认的顶点,也就是说如果relationships里面的源id或者目标id在users里面找不到,就会把找不到的id的那个人当成是defaultUser