2017-10-14 50 views
0

我是Scala和Spark的新手。我試圖刪除文本文件的重複行。 每行包含三列(矢量值),如:-4.5,-4.2,2.7Scala地圖過濾方法

這是我的計劃:

import org.apache.spark.SparkContext 
import org.apache.spark.SparkConf 
import org.apache.spark.rdd.RDD 
import scala.collection.mutable.Map 

object WordCount { 

def main(args: Array[String]) { 

    val conf = new SparkConf().setAppName("WordCount").setMaster("local[*]") 
    val sc = new SparkContext(conf) 
    val input = sc.textFile("/opt/spark/WC/WC_input.txt") 

    val keys = input.flatMap(line => line.split("/n")) 

    val singleKeys = keys.distinct 

    singleKeys.foreach(println) 
} 
} 

它的工作原理,但我想知道是否有辦法採用過濾功能。我必須在我的程序中使用它,但我不知道如何在所有行中進行迭代並刪除重複項(例如使用循環)。

如果有人有一個想法,會很棒!

謝謝!

回答

1

我認爲使用filter這樣做不會是一個非常有效的解決方案。對於每個元素,您都必須查看該元素是否已經存在於某種臨時數據集中,或計算這些元素在處理過的數據集中有多少。

如果你想遍歷它,也許做一些即時編輯,你可以應用map,然後reduceByKey分組相同的元素。像這樣

val singleKeys = 
    keys 
    .map(element => (element , 0)) 
    .reduceByKey((element, count) => element) 
    .map(_._1) 

在那裏你可以做更改數據集在第一map部分。 count參數雖然從reduceByKey的定義中沒有使用,但我們需要Tuple或Map中的第二個參數。

我認爲這基本上是如何distinct內部工作。

0

RDD的重複的元素可以以這種方式被刪除:

val data = List("-4.5,-4.2,2.7", "10,20,30", "-4.5,-4.2,2.7") 
val rdd = sparkContext.parallelize(data) 
val result = rdd.map((_, 1)).reduceByKey(_ + _).filter(_._2 == 1).map(_._1) 
result.foreach(println) 

結果:

10,20,30 
+0

非常感謝!但我想保留所有重複元素的一個實例。可能嗎 ? – Sol

+0

是的,如果刪除「過濾器」條款。結果將與「distinct」相同。 – pasha701

+0

你是最棒的!謝謝 (: – Sol