2016-02-24 30 views
0

我有一個RDD,我想創建一個具有唯一值的新RDD,但我有一個錯誤。
代碼:如何使獨特的元組scala

val rdd = sc.textFile("/user/ergorenova/socialmedia/allus/archivosOrigen").map(_.split(",", -1) match { 
    case Array(caso, canal, lote, estado, estadoo, estadooo, fechacreacioncaso, fechacierrecaso, username, clientid, nombre, apellido, ani, email) =>(canal, username, ani, email) 
}).distinct 

val twtface = rdd.map { 
    case ( canal, username, ani, email) => 
    val campoAni = "ANI" 
    (campoAni , ani , canal , username) 
}.distinct() 

twtface.take(3).foreach(println) 

這是CSV文件

caso2,canal2,lote,estado3,estado4,estado5,fechacreacioncaso2,fechacierrecaso2,username,clientid,nombre,apellido,ani,email 
2694464,Twitter,Redes Sociales Movistar - Twitter,Cerrado por Abandono – Robot,,,16/04/2015 23:57:51,17/04/2015 6:00:19,kariniseta,158,,,22,[email protected] 
2694464,Twitter,Redes Sociales Movistar - Twitter,Cerrado por Abandono – Robot,,,16/04/2015 23:57:51,17/04/2015 6:00:19,kariniseta,158,,,22,[email protected] 
2635376,Facebook,Redes Sociales Movistar - Facebook,Cerrado por Abandono – Robot,,,03/04/2015 20:20:18,04/04/2015 2:30:06,martin.saggini,1126,,,, 
2635376,Facebook,Redes Sociales Movistar - Facebook,Cerrado por Abandono – Robot,,,03/04/2015 20:20:18,04/04/2015 2:30:06,martin.saggini,1126,,,, 

錯誤:

scala.MatchError: [Ljava.lang.String;@dea08cc (of class [Ljava.lang.String;) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:21) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:21) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211) 
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73) 
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) 
    at org.apache.spark.scheduler.Task.run(Task.scala:88) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
+0

什麼是錯誤?請提供[最小,完整和可驗證示例](http://stackoverflow.com/help/mcve)! – eliasah

+0

已編輯。對不起,我新 –

+0

沒關係。 Neverthless這看起來像一個scala版本匹配,你是否檢查你的集羣和你的應用程序是否具有相同的spark和scala版本? – eliasah

回答

0

我認爲錯誤是由於您的CSV文件中缺少/額外的換行符。

您的分割和匹配假定csv的每一行都恰好有14個字段。根據您使用的編碼或文本編輯器,您可能在文檔的末尾添加了新的行。

我的建議是驗證每一行,並添加一個全面的案例,給你一個更詳細的錯誤信息,這樣你將避免模糊的MatchError。