如何在Apache Spark中聚合数据
我在3个节点上有一个分布式系统,我的数据分布在这些节点中。例如,我有一个test.csv
文件,它存在于所有3个节点,它包含的如何在Apache Spark中聚合数据
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,2
row2 | A1 , c1 , c2 ,1
row3 | A1 , c11, c2 ,1
row4 | A2 , c1 , c2 ,1
row5 | A2 , c1 , c2 ,1
row6 | A2 , c11, c2 ,1
row7 | A2 , c11, c21,1
row8 | A3 , c1 , c2 ,1
row9 | A3 , c1 , c2 ,2
row10 | A4 , c1 , c2 ,1
4列我想尝试汇总上述结果集。如何汇总由id
,c1
,c2
和c3
列设置的数据并将其输出为这样?
row | id, C1, C2, C3
----------------------
row1 | A1 , c1 , c2 ,3
row2 | A1 , c11, c2 ,1
row3 | A2 , c1 , c2 ,2
row4 | A2 , c11, c2 ,1
row5 | A2 , c11, c21,1
row6 | A3 , c1 , c2 ,3
row7 | A4 , c1 , c2 ,1
我试过如下:
from array import array
from datetime import datetime
import pyspark.sql
from pyspark.sql import Row, SQLContext, StructField, StringType, IntegerType
schema = StructType([
StructField("id", StringType(), False),
StructField("C1", StringType(), False),
StructField("C2", StringType(), False),
StructField("C3", IntegerType(), False)])
base_rdd = sc.textFile("/home/hduser/spark-1.1.0/Data/test.tsv").map(lambda l:
l.split(",")
rdd = base_rdd.map(lambda x: Row(id = x[0], C1 = x[1], C2 = x[2], C3 = int(x[3])))
sqlContext = SQLContext(sc)
srdd = sqlContext.inferSchema(rdd)
解决你的问题,你可以做下面的步骤。我不知道python步骤,下面是java步骤。我希望你能把它和python联系起来。
- 读csv文件
JavaRDD<String> input = sc.textFile(args[0]);
-
创建一对从文件RDD
JavaPairRDD<Integer,String> pairMap = input.mapToPair( new PairFunction<String, Integer, String>() { @Override public Tuple2<Integer, String> call(String line) throws Exception { String[] s = line.split(","); String key = s[0]+'#'+s[1]+'#' +s[2];// id,c1,c2 Integer value = Integer.valueOf(s[3]) //c3
return new Tuple2<Integer,String>(key, value); } }); 减少按钥匙地图
-
result
对象包含您的预期的结果,其中关键是id+'#'+c1+'#'+c2
和值被聚集c3
。你可以进一步使用这张地图。您可以在#
上标记密钥以获取您的列,并可以使用apache-spark-sql将其插入到表中。
JavaPairRDD<String,Integer> result = pairMap.reduceByKey( new Function2<Integer, Integer, Integer>() {
@Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } });
我希望这会有所帮助。
首先,我建议使用'com.databricks.spark.csv'来读取csv文件(当您运行pyspark shell http://spark-packages.org/package/databricks/spark-csv时,您需要使用'--package'来加载它)。然后使用GROUPBY方法:
df = (sqlContext.read
.format('com.databricks.spark.csv')
.option("inferSchema", "true")
.option("header", "true")
.load("<your_file>.csv"))
df2= df.groupBy('id', 'C1', 'C2').agg({'C3': 'sum'}).sort('id', 'C1')
df.show()
+---+---+---+---+
| id| C1| C2| C3|
+---+---+---+---+
| A1| c1| c2| 2|
| A1| c1| c2| 1|
| A1|c11| c2| 1|
| A2| c1| c2| 1|
| A2| c1| c2| 1|
| A2|c11| c2| 1|
| A2|c11|c21| 1|
| A3| c1| c2| 1|
| A3| c1| c2| 2|
| A4| c1| c2| 1|
+---+---+---+---+
df2.show()
+---+---+---+-------+
| id| C1| C2|sum(C3)|
+---+---+---+-------+
| A1| c1| c2| 3|
| A1|c11| c2| 1|
| A2| c1| c2| 2|
| A2|c11| c2| 1|
| A2|c11|c21| 1|
| A3| c1| c2| 3|
| A4| c1| c2| 1|
+---+---+---+-------+
如果标签“行”是非常重要的,你可以在以后添加它,并重新命名“SUM(C3)”到“C3”。有关更多信息,请参阅Spark Python API https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrame
您能否显示错误? – 2015-06-16 05:45:11