0
我有一個RDD,我想創建一個具有唯一值的新RDD,但我有一個錯誤。
代碼:如何使獨特的元組scala
val rdd = sc.textFile("/user/ergorenova/socialmedia/allus/archivosOrigen").map(_.split(",", -1) match {
case Array(caso, canal, lote, estado, estadoo, estadooo, fechacreacioncaso, fechacierrecaso, username, clientid, nombre, apellido, ani, email) =>(canal, username, ani, email)
}).distinct
val twtface = rdd.map {
case ( canal, username, ani, email) =>
val campoAni = "ANI"
(campoAni , ani , canal , username)
}.distinct()
twtface.take(3).foreach(println)
這是CSV文件
caso2,canal2,lote,estado3,estado4,estado5,fechacreacioncaso2,fechacierrecaso2,username,clientid,nombre,apellido,ani,email
2694464,Twitter,Redes Sociales Movistar - Twitter,Cerrado por Abandono – Robot,,,16/04/2015 23:57:51,17/04/2015 6:00:19,kariniseta,158,,,22,[email protected]
2694464,Twitter,Redes Sociales Movistar - Twitter,Cerrado por Abandono – Robot,,,16/04/2015 23:57:51,17/04/2015 6:00:19,kariniseta,158,,,22,[email protected]
2635376,Facebook,Redes Sociales Movistar - Facebook,Cerrado por Abandono – Robot,,,03/04/2015 20:20:18,04/04/2015 2:30:06,martin.saggini,1126,,,,
2635376,Facebook,Redes Sociales Movistar - Facebook,Cerrado por Abandono – Robot,,,03/04/2015 20:20:18,04/04/2015 2:30:06,martin.saggini,1126,,,,
錯誤:
scala.MatchError: [Ljava.lang.String;@dea08cc (of class [Ljava.lang.String;)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:21)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:21)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:211)
at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
什麼是錯誤?請提供[最小,完整和可驗證示例](http://stackoverflow.com/help/mcve)! – eliasah
已編輯。對不起,我新 –
沒關係。 Neverthless這看起來像一個scala版本匹配,你是否檢查你的集羣和你的應用程序是否具有相同的spark和scala版本? – eliasah