4
我了個去實現,像這樣一個結構化的流...星火結構化數據流ForeachWriter和數據庫性能
myDataSet
.map(r => StatementWrapper.Transform(r))
.writeStream
.foreach(MyWrapper.myWriter)
.start()
.awaitTermination()
這一切似乎工作,但看着吞吐量MyWrapper.myWriter的是可怕的。它有效地努力成爲一個JDBC水槽,它看起來像:
val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] {
var connection: Connection = _
override def open(partitionId: Long, version: Long): Boolean = {
Try (connection = getRemoteConnection).isSuccess
}
override def process(row: Seq[String]) {
val statement = connection.createStatement()
try {
row.foreach(s => statement.execute(s))
} catch {
case e: SQLSyntaxErrorException => println(e)
case e: SQLException => println(e)
} finally {
statement.closeOnCompletion()
}
}
override def close(errorOrNull: Throwable) {
connection.close()
}
}
所以我的問題是 - 新ForeachWriter實例的每一行?因此open()和close()被稱爲數據集中的每一行?
是否有更好的設計來提高吞吐量?
如何解析SQL語句一次並執行很多次,同時保持數據庫連接打開?
更新 - 我加了一些記錄。似乎沒有關閉/打開每個事務的連接。 – Exie
更新 - 我試着將statement.closeOnCompletion()更改爲statement.close(),但沒有觀察到任何改進。 – Exie