星火结构化数据流ForeachWriter和数据库性能
问题描述:
我了个去实现,像这样一个结构化的流...星火结构化数据流ForeachWriter和数据库性能
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
这一切似乎工作,但看着吞吐量MyWrapper.myWriter的是可怕的。它有效地努力成为一个JDBC水槽,它看起来像:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach(s => statement.execute(s))
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
所以我的问题是 - 新ForeachWriter实例的每一行?因此open()和close()被称为数据集中的每一行?
是否有更好的设计来提高吞吐量?
如何解析SQL语句一次并执行很多次,同时保持数据库连接打开?
答
底层汇的打开和关闭取决于您的实施ForeachWriter
的。
它调用ForeachWriter
相关类是ForeachSink
,这是它调用你写的代码:
data.queryExecution.toRdd.foreachPartition { iter =>
if (writer.open(TaskContext.getPartitionId(), batchId)) {
try {
while (iter.hasNext) {
writer.process(encoder.fromRow(iter.next()))
}
} catch {
case e: Throwable =>
writer.close(e)
throw e
}
writer.close(null)
} else {
writer.close(null)
}
}
开放和作家的收盘尝试是从您的源产生的foreach批次。如果您想每次打开open
和close
以关闭接收器驱动程序,则需要通过您的实现进行此操作。
如果你想要对数据的处理方式更多的控制,可以实现Sink
特质赋予一个批次ID和基础DataFrame
:
trait Sink {
def addBatch(batchId: Long, data: DataFrame): Unit
}
更新 - 我加了一些记录。似乎没有关闭/打开每个事务的连接。 – Exie
更新 - 我试着将statement.closeOnCompletion()更改为statement.close(),但没有观察到任何改进。 – Exie