2017-02-22 61 views
0

我們有一個火花流應用,我們從卡夫卡收到DSTREAM和需要存儲到dynamoDB ....我與兩個方面做試驗如下星火地圖VS foreachRdd

代碼描述requestsWithState是DSTREAM

代碼段1與foreachRDD:

requestsWithState.foreachRDD { rdd => 
    println("Data being populated to Pulsar") 
    rdd.foreach { case (id, eventStream) => 
    println("id is " + id + " Event is " + eventStream) 
    DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
    } 
} 

代碼段2與圖:

requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

requestsWithState.print(1) 

代碼Snippet1工作的罰款,並填充數據庫...第二個代碼片段不起作用....我們新手的火花,很想知道它背後的原因和方式讓它工作? ........我們正在試驗的原因(我們知道這是一個轉換,foreachRdd是一個操作)foreachRdd對於集羣負載較重的用例非常緩慢,我們發現如果映射更快我們可以得到它的工作.....請幫助我們獲取地圖代碼工作

+0

你應該(幾乎)從來沒有在地圖或flatMap副作用! – JiriS

回答

0

Map是Spark中的一種轉換(惰性轉換),除非在此之後調用spark動作,否則不會執行。 火花改造和行動,是指下面的鏈接 http://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

.....我在地圖requestsWithState.print(1)後有一個動作,但它仍然不起作用我已經更新了相應的問題請看看 – user2359997

+0

RDD是不可變的,所以地圖會返回一個新的rdd。因此,請嘗試使用'code' requestsWithState = requestsWithState.map(rdd => {rdd match {id,eventStream)=> {println(「id is」+ id +「Event is」+ eventStream) val dynamoConnection = setupDynamoClientConnection() DBUtils.putItem(dynamoConnection,ID,eventStream.toString()) } } }) requestsWithState.print(1)'code' – Neetika

0

map版本沒有任何行動,.map不是一個動作,而是轉型。

如果不採取措施,轉換不會被執行。

參見例如http://training.databricks.com/visualapi.pdfhttp://spark.apache.org/docs/latest/programming-guide.html#transformations

+0

我確實有地圖requestsWithState.print後的動作(1)但仍然不起作用我已經更新了相應的問題請看看 – user2359997

+0

嗨,你添加一個動作requestWithState.print(1)? 我知道這是一個古老的問題,但我正在通過您的使用案例進行調整。謝謝 –

0

DStream.map返回另一個流。您應該在該流上調用打印,而不是原始打印。

所以在斯卡拉:

val transformedStream = requestsWithState.map (rdd => { rdd match { 
     case (id, eventStream) => { 
      println("id is " + id + " Event is " + eventStream) 
      val dynamoConnection = setupDynamoClientConnection() 
      DBUtils.putItem(dynamoConnection, id, eventStream.toString()) 
     } 
     } 
    }) 

transformedStream.print(1)