是否可以實現上面顯示的場景?
系統從一個鍵值對開始,並發現新的對。首先,鍵值對的數量會增加,然後在迭代中縮小。Spark Streaming:如何將輸出反饋到輸入
更新:我必須轉向Flink Streaming進行迭代支持。儘管與卡夫卡嘗試!
是否可以實現上面顯示的場景?
系統從一個鍵值對開始,並發現新的對。首先,鍵值對的數量會增加,然後在迭代中縮小。Spark Streaming:如何將輸出反饋到輸入
更新:我必須轉向Flink Streaming進行迭代支持。儘管與卡夫卡嘗試!
使用Apache Flink,可以通過iterate
API調用定義反饋邊緣。 iterate
方法需要一個step函數,它給定一個輸入流,產生一個反饋流和一個輸出流。前一個流被反饋到step功能,後一個流被髮送給下游運營商。
一個簡單的例子是這樣的:
val env = StreamExecutionEnvironment.getExecutionEnvironment
val input = env.fromElements(1).map(x => (x, math.random))
val output = input.iterate {
inputStream =>
val iterationBody = inputStream.flatMap {
randomWalk =>
val (step, position) = randomWalk
val direction = 2 * (math.random - 0.5)
val bifurcate = math.random >= 0.75
Seq(
Some((step + 1, position + direction)),
if (bifurcate) Some((step + 1, position - direction)) else None).flatten
}
val feedback = iterationBody.filter {
randomWalk => math.abs(randomWalk._2) < 1.0
}
val output = iterationBody.filter {
randomWalk => math.abs(randomWalk._2) >= 1.0
}
(feedback, output)
}
output.print()
// execute program
env.execute("Random Walk with Bifurcation")
在這裏,我們計算出一個隨機遊走,我們隨機分裂我們走在相反的方向前進。如果其絕對位置值大於或等於1.0
,則隨機漫步結束。
我轉移到Flink! Flink流式似乎比Spark更好! –
如果這是關於Spark,你爲什麼使用Hadoop和Flink標籤? – zero323
刪除了Hadoop。 Flink,因爲如果flink有可能,我會很有興趣知道。我可能會改變這個框架。 –