2016-04-30 169 views
6

數據我有一些像這樣的代碼:星火斯卡拉取回從rdd.foreachPartition

 println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass) 
     val lastRevs = distinctFileGidsRDD. 
     foreachPartition(iter => { 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      while(iter.hasNext) { 
      val item = iter.next() 
      //println(item(0)) 
      println("String: "+item(0).toString()) 
      val jsonStr = DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar". 
       map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      println("\nJSON: "+jsonStr) 
      } 
     }) 
     println("\nEND Last Revs Class: "+ lastRevs.getClass) 

代碼輸出(重編輯)類似:

BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD 
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw 
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...) 
... 
JSON: None() 
END Last Revs Class: void 

問題1: 哪有我得到lastRevs值是一個有用的格式,如JSON字符串/ null或像Some/None這樣的選項?

問題2: 我的首選項:是否有另一種方式得到RDD類格式(而不是迭代器格式)的分區數據?

dstream.foreachRDD { (rdd, time) => 
    rdd.foreachPartition { partitionIterator => 
    val partitionId = TaskContext.get.partitionId() 
    val uniqueId = generateUniqueId(time.milliseconds, partitionId) 
    // use this uniqueId to transactionally commit the data in partitionIterator 
    } 
} 

http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

問題三:讓我使用一個理智的方法(給我上面的鏈接)的數據的方法? (拋開現在這是一個scalikejdbc系統JDBC的事實,這將成爲除此原型之外的某種類型的關鍵價值存儲庫)。

+0

我不明白的問題。 'lastRevs'應該是'Unit',因爲'.forEachPartition'只用於副作用(函數T => Unit)。我想你想改變數據,就像使用'mapPartitions'來代替。我想了解這裏的總體目標是什麼,因爲單個問題對我來說沒有多大意義 – maasg

+0

@maasg:是的。這是我正在尋找的答案 - mapPartitions。我在http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine找到了另一個例子。 – codeaperature

回答

4

創建一個使用執行程序本地資源的轉換(如數據庫或網絡連接),則應使用rdd.mapPartitions。它允許在執行程序本地初始化一些代碼,並使用這些本地資源來處理分區中的數據。

代碼應該是這樣的:

val lastRevs = distinctFileGidsRDD. 
     mapPartitions{iter => 
      SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword) 
      iter.map{ element => 
      DB.readOnly { implicit session => 
       sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${element(0)}::varchar" 
       .map { resultSet => resultSet.string(1) }.single.apply() 
      } 
      } 
     } 
+0

你的意思是它不同於'foreachPartition',因爲它使用Executor的資源而不是Driver的資源? IE瀏覽器。代碼'foreachPartition'代碼在Driver上執行,而在執行器上執行'mapPartitions' ...對嗎? – lisak

+2

@lisak不,'foreachPartition'和'mapPartitions'都可以讓你在執行器上運行代碼。不同之處在於'foreachPartition'只會產生副作用(如寫入數據庫),而'mapPartitions'則會返回一個值。這個問題的關鍵是'如何獲取數據',因此'mapPartitions'就是要走的路。 – maasg