Spark 性能调优之 foreachPartition 优化写数据库性能
foreach 操作写数据库
缺点
- 首先,对于每条数据,都要单独去调用一次function,task为每个数据,都要去执行一次function函数。
如果100万条数据,(一个partition),调用100万次。性能比较差。 - 另外一个非常非常重要的一点
如果每个数据,你都去创建一个数据库连接的话,那么你就得创建100万次数据库连接。
但是要注意的是,数据库连接的创建和销毁,都是非常非常消耗性能的。虽然我们之前已经用了数据库连接池,只是创建了固定数量的数据库连接。
你还是得多次通过数据库连接,往数据库(MySQL)发送一条SQL语句,然后MySQL需要去执行这条SQL语句。如果有100万条数据,那么就是100万次发送SQL语句。
以上两点(数据库连接,多次发送SQL语句),都是非常消耗性能的。
sessionDetailRDD.foreach(new VoidFunction<Tuple2<String, Tuple2<String, Row>>>() {
private static final long serialVersionUID = 3512948720633017641L;
@Override
public void call(Tuple2<String, Tuple2<String, Row>> tuple) throws Exception {
Row row = tuple._2._2;
SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(1));
sessionDetail.setSessionid(row.getString(2));
sessionDetail.setPageid(row.getLong(3));
sessionDetail.setActionTime(row.getString(4));
sessionDetail.setSearchKeyword(row.getString(5));
sessionDetail.setClickCategoryId(row.getLong(6));
sessionDetail.setClickProductId(row.getLong(7));
sessionDetail.setOrderCategoryIds(row.getString(8));
sessionDetail.setOrderProductIds(row.getString(9));
sessionDetail.setPayCategoryIds(row.getString(10));
sessionDetail.setPayProductIds(row.getString(11));
System.out.println("开始插入session明细数据");
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
sessionDetailDAO.insert(sessionDetail);
}
});
foreachPartition 写库
优点
1、对于我们写的function函数,就调用一次,一次传入一个partition所有的数据
2、主要创建或者获取一个数据库连接就可以
3、只要向数据库发送一次SQL语句和多组参数即可
数据对比
一个partition大概是1千条左右
用foreach,跟用foreachPartition,性能的提升达到了2~3分钟。
可能出现的问题
在实际生产环境中,清一色,都是使用foreachPartition操作;但是有个问题,跟mapPartitions操作一样,如果一个partition的数量真的特别特别大,比如真的是100万,那基本上就不太靠谱了。
一下子进来,很有可能会发生OOM,内存溢出的问题。
代码
extractSessionDetailRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String, Tuple2<String, Row>>>>() {
private static final long serialVersionUID = -2027147238095364243L;
@Override
public void call(Iterator<Tuple2<String, Tuple2<String, Row>>> iterator) throws Exception {
ArrayList<SessionDetail> sessionDetails = new ArrayList<SessionDetail>();
while(iterator.hasNext()){
Tuple2<String, Tuple2<String, Row>> tuple = iterator.next();
Row row = tuple._2._2;
SessionDetail sessionDetail = new SessionDetail();
sessionDetail.setTaskid(taskid);
sessionDetail.setUserid(row.getLong(1));
sessionDetail.setSessionid(row.getString(2));
sessionDetail.setPageid(row.getLong(3));
sessionDetail.setActionTime(row.getString(4));
sessionDetail.setSearchKeyword(row.getString(5));
sessionDetail.setClickCategoryId(row.getLong(6));
sessionDetail.setClickProductId(row.getLong(7));
sessionDetail.setOrderCategoryIds(row.getString(8));
sessionDetail.setOrderProductIds(row.getString(9));
sessionDetail.setPayCategoryIds(row.getString(10));
sessionDetail.setPayProductIds(row.getString(11));
sessionDetails.add(sessionDetail);
}
ISessionDetailDAO sessionDetailDAO = DAOFactory.getSessionDetailDAO();
sessionDetailDAO.insertBatch(sessionDetails);
}
});