數據我有一些像這樣的代碼:星火斯卡拉取回從rdd.foreachPartition
println("\nBEGIN Last Revs Class: "+ distinctFileGidsRDD.getClass)
val lastRevs = distinctFileGidsRDD.
foreachPartition(iter => {
SetupJDBC(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
while(iter.hasNext) {
val item = iter.next()
//println(item(0))
println("String: "+item(0).toString())
val jsonStr = DB.readOnly { implicit session =>
sql"SELECT jsonStr FROM lasttail WHERE fileGId = ${item(0)}::varchar".
map { resultSet => resultSet.string(1) }.single.apply()
}
println("\nJSON: "+jsonStr)
}
})
println("\nEND Last Revs Class: "+ lastRevs.getClass)
代碼輸出(重編輯)類似:
BEGIN Last Revs Class: class org.apache.spark.rdd.MapPartitionsRDD
String: 1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
String: 1eY2wxoVq17KGMUBzCZZ34J9gSNzF038grf5RP38DUxw
JSON: Some({"Struct":{"fileGid":"1fqhSXPE3GwrJ6SZzC65gJnBaB5_b7j3pWNSfqzU5FoM",...)
...
JSON: None()
END Last Revs Class: void
問題1: 哪有我得到lastRevs值是一個有用的格式,如JSON字符串/ null或像Some/None這樣的選項?
問題2: 我的首選項:是否有另一種方式得到RDD類格式(而不是迭代器格式)的分區數據?
dstream.foreachRDD { (rdd, time) =>
rdd.foreachPartition { partitionIterator =>
val partitionId = TaskContext.get.partitionId()
val uniqueId = generateUniqueId(time.milliseconds, partitionId)
// use this uniqueId to transactionally commit the data in partitionIterator
}
}
從http://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning
問題三:讓我使用一個理智的方法(給我上面的鏈接)的數據的方法? (拋開現在這是一個scalikejdbc系統JDBC的事實,這將成爲除此原型之外的某種類型的關鍵價值存儲庫)。
我不明白的問題。 'lastRevs'應該是'Unit',因爲'.forEachPartition'只用於副作用(函數T => Unit)。我想你想改變數據,就像使用'mapPartitions'來代替。我想了解這裏的總體目標是什麼,因爲單個問題對我來說沒有多大意義 – maasg
@maasg:是的。這是我正在尋找的答案 - mapPartitions。我在http://stackoverflow.com/questions/21698443/spark-best-practice-for-retrieving-big-data-from-rdd-to-local-machine找到了另一個例子。 – codeaperature