2017-10-18 146 views
4

我了個去實現,像這樣一個結構化的流...星火結構化數據流ForeachWriter和數據庫性能

myDataSet 
    .map(r => StatementWrapper.Transform(r)) 
    .writeStream 
    .foreach(MyWrapper.myWriter) 
    .start() 
    .awaitTermination() 

這一切似乎工作,但看着吞吐量MyWrapper.myWriter的是可怕的。它有效地努力成爲一個JDBC水槽,它看起來像:

val myWriter: ForeachWriter[Seq[String]] = new ForeachWriter[Seq[String]] { 

    var connection: Connection = _ 

    override def open(partitionId: Long, version: Long): Boolean = { 
    Try (connection = getRemoteConnection).isSuccess 
    } 

    override def process(row: Seq[String]) { 
    val statement = connection.createStatement() 
    try { 
     row.foreach(s => statement.execute(s)) 
    } catch { 
     case e: SQLSyntaxErrorException => println(e) 
     case e: SQLException => println(e) 
    } finally { 
     statement.closeOnCompletion() 
    } 
    } 

    override def close(errorOrNull: Throwable) { 
    connection.close() 
    } 
} 

所以我的問題是 - 新ForeachWriter實例的每一行?因此open()和close()被稱爲數據集中的每一行?

是否有更好的設計來提高吞吐量?

如何解析SQL語句一次並執行很多次,同時保持數據庫連接打開?

+0

更新 - 我加了一些記錄。似乎沒有關閉/打開每個事務的連接。 – Exie

+0

更新 - 我試着將statement.closeOnCompletion()更改爲statement.close(),但沒有觀察到任何改進。 – Exie

回答

2

底層匯的打開和關閉取決於您的實施ForeachWriter

它調用ForeachWriter相關類是ForeachSink,這是它調用你寫的代碼:

data.queryExecution.toRdd.foreachPartition { iter => 
    if (writer.open(TaskContext.getPartitionId(), batchId)) { 
    try { 
     while (iter.hasNext) { 
     writer.process(encoder.fromRow(iter.next())) 
     } 
    } catch { 
     case e: Throwable => 
     writer.close(e) 
     throw e 
    } 
    writer.close(null) 
    } else { 
    writer.close(null) 
    } 
} 

開放和作家的收盤嘗試是從您的源產生的foreach批次。如果您想每次打開openclose以關閉接收器驅動程序,則需要通過您的實現進行此操作。

如果你想要對數據的處理方式更多的控制,可以實現Sink特質賦予一個批次ID和基礎DataFrame

trait Sink { 
    def addBatch(batchId: Long, data: DataFrame): Unit 
}