2015-09-08 159 views
2

我試圖將RDD映射爲這樣(請參閱輸出以獲得結果)並將映射減小十進制值,並且不斷收到錯誤。當我嘗試使用reduceByKey()和字數時,它工作正常。十進制值的總結是不同的?Apache Spark reduceByKey總計小數

val voltageRDD= myRDD.map(i=> i.split(";")) 
    .filter(i=> i(0).split("/")(2)=="2008") 
    .map(i=> (i(0).split("/")(2),i(2).toFloat)).take(5) 

輸出:

voltageRDD: Array[(String, Float)] = Array((2008,1.62), (2008,1.626), (2008,1.622), (2008,1.612), (2008,1.612)) 

當試圖減少:

val voltageRDD= myRDD.map(i=> i.split(";")) 
    .filter(i=> i(0).split("/")(2)=="2008") 
    .map(i=> (i(0).split("/")(2),i(2).toFloat)).reduceByKey(_+_).take(5) 

我收到以下錯誤:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2954.0 failed 4 times, most recent failure: Lost task 0.3 in stage 2954.0 (TID 15696, 10.19.240.54): java.lang.NumberFormatException: For input string: "?" 
+3

你確定你的所有數據都是以這種格式嗎?在你的代碼中,第二個代碼片段將貫穿整個數據集(因爲它將執行'reduceByKey'然後執行'take'),而第一個代碼片段將只運行前幾個記錄。如果在第一個「5」記錄之後的某個地方(過濾之後)你的'i(2)'是'?',它會在第二個片段中崩潰,但不會在第一個片段中崩潰,因爲spark是懶惰的。 –

+0

@MateuszDymczyk謝謝你指出。我的數據集並不乾淨,因此格式不一樣! – codeBarer

+0

在這種情況下,我添加它作爲答案 –

回答

1

如果您的數據包含不能解析爲浮點的列,那麼您應該事先將它們過濾出來,或者相應地對待它們。如果您看到不可分析的條目,這種處理可能意味着您分配的值爲0.0f。下面的代碼完全是這樣的。

val voltageRDD= myRDD.map(i=> i.split(";")) 
    .filter(i => i(0).split("/")(2)=="2008") 
    .map(i => (i(0).split("/")(2), Try{ i(2).toFloat }.toOption.getOrElse(0.0f))) 
    .reduceByKey(_ + _).take(5) 
1

短版:你可能有一個線其中i(2)等於?

根據我的評論,您的數據很可能是不一致的,因爲take(5)因爲take(5)而沒有需要火花在整個數據集上執行操作的操作,所以在第一個代碼段中不會出現問題。 Spark是懶惰的,因此只會執行計算,直到得到來自map -> filter -> map鏈的5結果。

另一方面,第二個片段將對您的整個數據集執行計算,因此它可以執行reduceByKey,並且只有這樣它纔會有5個結果,因此它可能會在您的數據集中發現第一個片段太多的問題。