2016-09-16 53 views
1

警告:我是spark和scala的新手。我發現堆棧溢出的一些問題與我的非常相似,但一直沒能將這些問題轉化爲我的問題。Spark Scala創建具有排序日期值的RDD對

上下文。我有一對RDD最初與窗體(id,日期)的記錄,我想創建窗體(ID,last_date_seen)的RDD。在原始數據中,日期是一個字符串,我使用Joda轉換爲DateTime

我已經使用combineByKey成功完成了這項工作,並且我明白groupByKey效率低下,在大的情況下這可能不實用,但我試圖瞭解使用調用範圍的方法。

我想要做的是groupByKey然後mapValues,以groupByKey產生的值列表獲得列表中的最大值。

我已經試過:(我創建了一個基於不同的堆棧溢出的問題上的日期時間排序,所以排序)

我已經嘗試了許多方法,並大多數都給我一個例外,即任務不可序列化。一個例子是,

rdd.groupByKey().mapValues(_.toList.sorted.last) 

我已經試過任何數量的這種變種;)沒有toList,我得到整理例外不是Iterable[org.joda.time.DateTime]成員。我成功地使用了mapValues並做了更簡單的事情,但是一旦我嘗試添加排序,事情就會變糟糕。我試過sortBy並指定Ordering

深入瞭解爲什麼發送到排序方法的東西不可序列化,這對我總體上會有幫助。當我陷入陷阱時,我不知道該如何識別。

其中一個類似的堆棧溢出問題表明,您可以僅使用sortBy而不是使用mapValues,並指定它位於第二個元素上,因此.sortBy(_._2)。這對我來說也失敗了。理想情況下,如果這樣做是有道理的,我也想知道。

這似乎是一個非常簡單而且可能很常見的事情,所以我覺得我錯過了一些東西。

編輯 - 爲例外的更多細節添加。請注意,雖然我無法重現此錯誤。

錯誤堆棧中的不可序列化錯誤表明我在另一個堆棧溢出中使用的隱式排序是罪魁禍首。請注意,我無法重現這個讓我幾個小時感到困擾的錯誤(請參閱答案)。

Caused by: java.io.NotSerializableException:  
$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$ 
Serialization stack: 
    - object not serializable (class: 
    $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$, 
    value:  $i[email protected]6fc2db37) 
    - field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, 
name: Joda$module, type: class $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$Joda$) 

Joda模塊剛剛定義過。

object Joda { 
implicit def dateTimeOrdering: Ordering[DateTime] = 
    Ordering.fromLessThan(_ isBefore _) 
} 
+1

只是'rdd.reduceByKey((x,y)=> if(x.isAfter(y))x else y)'並確保閱讀http://stackoverflow.com/a/33439328/1560062 – zero323

+0

謝謝@ zero323。另一個更緊湊的combineByKey版本,我還沒有嘗試過。似乎在Spark中遇到序列化問題是很常見的,所以儘管我現在爲我的小練習獲得了多個解決方案,但我想更好地理解爲什麼要使用toList和排序拋出錯誤。也許我的問題應該重寫到導致序列化錯誤的原因。 –

+1

對於序列化問題,您應該檢查鏈接的問題。 Joda類實際上不是Spark友好的。 – zero323

回答

0

我原來的問題確實有兩個組成部分: *如何改造利用GROUPBY的RDD以檢索每個ID中的最後查看日期和 *爲什麼我會想盡了辦法給人一種「任務不可序列化「的錯誤。

不幸的是,在重新啓動spark-shell並回溯我的步驟後,我無法重現此錯誤。我在問題中列出的代碼與我已經建立的DateTime順序配對,工作得很好。最近我又遇到了另一個類似的問題,我可以將它追溯到隱式值的衝突中,這是我之前在shell中爲完全不同的目的設置的。我懷疑這也是這裏的罪魁禍首,但無法證實這一點。

評論中引用的另一個堆棧溢出問題表示Joda引起了其他問題。

爲了完整起見,我能夠進行轉換並從幾種方式中提取出最後一個日期。 @ zero323在使用reduceByKey的評論中給出的最直接的方式。

使用groupByKey,在問題的代碼

rdd.groupByKey().mapValues(_.toList.sorted.last) 

正常工作時下面的隱式排序是到位:

object Joda { 
    implicit def dateTimeOrdering: Ordering[DateTime] = 
    Ordering.fromLessThan(_ isBefore _)} 
import Joda._ 

同樣,

rdd.groupByKey.mapValues(_.toList.max) 

產生相同的。

我也使用定義的順序複製結果並傳遞給顯式排序。

不幸的是,我無法確定爲什麼對象喬達在第一次會議中拋出了一個異常,並沒有在我嘗試的下一個。