2015-01-14 55 views
0

原始數據我有看起來像這樣:變換成RDD爲RowMatrix PCA

RDD數據:

key -> index

1 -> 2

1 -> 3

1 -> 5

2 -> 1

2 -> 3

2 -> 4

我怎麼能轉換RDD以下格式?

key -> index1, index2, index3, index4, index5

1 -> 0,1,1,0,1

2 -> 1,0,1,1,0

我目前的方法是:

val vectors = filtered_data_by_key.map(x => { 
    var temp = Array[AnyVal]() 
    x._2.copyToArray(temp) 
    (x._1, Vectors.sparse(filtered_key_size, temp.map(_.asInstanceOf[Int]), Array.fill(filtered_key_size)(1))) 
}) 

我得到了一些奇怪的錯誤:

org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 54.0 failed 1 times, most recent failure: Lost task 3.0 in stage 54.0 (TID 75, localhost): java.lang.IllegalArgumentException: requirement failed

當我嘗試使用下面的代碼調試這個程序:

val vectors = filtered_data_by_key.map(x => { 
    val temp = Array[AnyVal]() 
    val t = x._2.copyToArray(temp) 
    (x._1, temp) 
}) 

我發現temp是空的,所以問題在copyToArray()

我不知道如何解決這個問題。

+0

它有什麼問題? –

+0

copyToArray方法需要臨時爲類型AnyVal,在我將temp轉換爲AnyVal並調用copyToarray後,結果爲空 –

回答

1

我完全不明白這個問題。你的鑰匙爲什麼重要?什麼是最大指數值?在你的代碼中,你使用不同數量的鍵作爲索引的最大值,但我認爲這是一個錯誤。

但我將承擔最大的指數值爲5。在這種情況下,我相信這將是你在找什麼:

val vectors = data_by_key.map({case(k,it)=>Vectors.sparse(5,it.map(x=>x-1).toArray, 
     Array.fill(it.size)(1))}) 

val rm = new RowMatrix(vectors) 

我減少一個索引號,因爲他們應該從0開始

錯誤'需求失敗'是由於您的索引和值向量不具有相同的大小。

+0

如何將(key,iterator)轉換爲RowMatrix? –

+0

請檢查我的更新答案。 – pzecevic

+0

這是工作,謝謝。 –

相關問題