2016-11-14 23 views

回答

0

在Spark 2.0中使用Structured Streaming,處理流式DataFrame非常接近正常的DataFrame。在以下測試示例中,添加新批次數據時,值計數將打印到控制檯。

val wordCounts = words.groupBy("value").count() 
val query = wordCounts.writeStream 
    .outputMode("complete") 
    .format("console") 
    .start() 

我們還可以創建自己的StreamSinkProvider來決定在有新的批處理數據出現時該怎麼做。

class CustomSinkProvider extends StreamSinkProvider { 
    def createSink(
        sqlContext: SQLContext, 
        parameters: Map[String, String], 
        partitionColumns: Seq[String], 
        outputMode: OutputMode): Sink = { 
    new Sink { 
     override def addBatch(batchId: Long, data: DataFrame): Unit = { 
     // Do something. 
     } 
    } 
    } 
} 

然後使用以下代碼來使用CustomSinkProvider

val query = wordCounts.writeStream 
    .outputMode("complete") 
    .format(classOf[CustomSinkProvider].getCanonicalName) 
    .start() 
+0

我不認爲這回答我的問題。我的問題是關於如何計算特定領域的變化。 –

相關問題