星火结构化数据流ForeachWriter和数据库性能

问题描述:

我了个去实现,像这样一个结构化的流...星火结构化数据流ForeachWriter和数据库性能

myDataSet 
    .map(r => StatementWrapper.Transform(r)) 
    .writeStream 
    .foreach(MyWrapper.myWriter) 
    .start() 
    .awaitTermination() 

这一切似乎工作,但看着吞吐量MyWrapper.myWriter的是可怕的。它有效地努力成为一个JDBC水槽,它看起来像:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] { 

    var connection: Connection = _ 

    override def open(partitionId: Long, version: Long): Boolean = { 
    Try (connection = getRemoteConnection).isSuccess 
    } 

    override def process(row: Seq[String]) { 
    val statement = connection.createStatement() 
    try { 
     row.foreach(s => statement.execute(s)) 
    } catch { 
     case e: SQLSyntaxErrorException => println(e) 
     case e: SQLException => println(e) 
    } finally { 
     statement.closeOnCompletion() 
    } 
    } 

    override def close(errorOrNull: Throwable) { 
    connection.close() 
    } 
} 

所以我的问题是 - 新ForeachWriter实例的每一行?因此open()和close()被称为数据集中的每一行?

是否有更好的设计来提高吞吐量?

如何解析SQL语句一次并执行很多次,同时保持数据库连接打开?

+0

更新 - 我加了一些记录。似乎没有关闭/打开每个事务的连接。 – Exie

+0

更新 - 我试着将statement.closeOnCompletion()更改为statement.close(),但没有观察到任何改进。 – Exie

底层汇的打开和关闭取决于您的实施ForeachWriter

它调用ForeachWriter相关类是ForeachSink,这是它调用你写的代码:

data.queryExecution.toRdd.foreachPartition { iter => 
    if (writer.open(TaskContext.getPartitionId(), batchId)) { 
    try { 
     while (iter.hasNext) { 
     writer.process(encoder.fromRow(iter.next())) 
     } 
    } catch { 
     case e: Throwable => 
     writer.close(e) 
     throw e 
    } 
    writer.close(null) 
    } else { 
    writer.close(null) 
    } 
} 

开放和作家的收盘尝试是从您的源产生的foreach批次。如果您想每次打开openclose以关闭接收器驱动程序,则需要通过您的实现进行此操作。

如果你想要对数据的处理方式更多的控制,可以实现Sink特质赋予一个批次ID和基础DataFrame

trait Sink { 
    def addBatch(batchId: Long, data: DataFrame): Unit 
}