我們有一個火花流應用,我們從卡夫卡收到DSTREAM和需要存儲到dynamoDB ....我與兩個方面做試驗如下星火地圖VS foreachRdd
代碼描述requestsWithState是DSTREAM
代碼段1與foreachRDD:
requestsWithState.foreachRDD { rdd =>
println("Data being populated to Pulsar")
rdd.foreach { case (id, eventStream) =>
println("id is " + id + " Event is " + eventStream)
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
代碼段2與圖:
requestsWithState.map (rdd => { rdd match {
case (id, eventStream) => {
println("id is " + id + " Event is " + eventStream)
val dynamoConnection = setupDynamoClientConnection()
DBUtils.putItem(dynamoConnection, id, eventStream.toString())
}
}
})
requestsWithState.print(1)
代碼Snippet1工作的罰款,並填充數據庫...第二個代碼片段不起作用....我們新手的火花,很想知道它背後的原因和方式讓它工作? ........我們正在試驗的原因(我們知道這是一個轉換,foreachRdd是一個操作)foreachRdd對於集羣負載較重的用例非常緩慢,我們發現如果映射更快我們可以得到它的工作.....請幫助我們獲取地圖代碼工作
你應該(幾乎)從來沒有在地圖或flatMap副作用! – JiriS