2014-07-04 91 views
3

我有以下代碼Apache的火花和Python拉姆達

file = spark.textFile("hdfs://...") 
counts = file.flatMap(lambda line: line.split(" ")) \ 
      .map(lambda word: (word, 1)) \ 
      .reduceByKey(lambda a, b: a + b) 
counts.saveAsTextFile("hdfs://...") 

http://spark.apache.org/examples.html我抄在這裏

我無法理解這段代碼尤其是關鍵字

  1. flatmap的例子,
  2. 地圖和
  3. reduceby

有人可以發生的事情用簡單的英語解釋。

+0

我不是專家,但我認爲flatMap建立從嵌套結構(字行的名單?)的列表,地圖應用功能的所有元素,一個d reduceByKey按鍵對這些元素進行分組(我猜這裏是相同的單詞),並將函數(這裏是一個和)成對地應用。這可能會計數文本中每個單詞的出現次數。 – user189

+1

如果您使用函數式語言來進行函數式編程,事情會變得更加簡潔和易於閱讀。即我強烈建議使用Scala而不是OO腳本語言。 Scala功能更強大,對Spark更具性能,並且更容易挖掘Spark代碼。你的代碼變成:'spark.textFile(「hdfs:// ...」).flatMap(_。split(「」))。map((_,1))。reduceByKey(_ + _)。saveAsTextFile 「hdfs:// ...」)' – samthebest

回答

10

map是最簡單的,它本質上說,做序列的每個元素在給定的操作並返回結果序列(非常類似於foreach)。 flatMap是同樣的事情,但不是隻有一個元素每個元素返回你被允許返回一個序列(可以爲空)。這是一個解釋difference between map and flatMap的答案。最後reduceByKey需要聚合函數(意味着它採用兩個相同類型的參數和返回類型,也應該是交換和關聯,否則你會得到不一致的結果),這是用於聚合每V每個K在你的(K,V)對序列。

*
reduce (lambda a, b: a + b,[1,2,3,4])

這是說集合體+整個列表,它會做

1 + 2 = 3 
3 + 3 = 6 
6 + 4 = 10 
final result is 10 

者皆減少,除了你同樣的事情做一個減少每個獨特鍵。


所以要解釋它在你的榜樣

file = spark.textFile("hdfs://...") // open text file each element of the RDD is one line of the file 
counts = file.flatMap(lambda line: line.split(" ")) //flatMap is needed here to return every word (separated by a space) in the line as an Array 
      .map(lambda word: (word, 1)) //map each word to a value of 1 so they can be summed 
      .reduceByKey(lambda a, b: a + b) // get an RDD of the count of every unique word by aggregating (adding up) all the 1's you wrote in the last step 
counts.saveAsTextFile("hdfs://...") //Save the file onto HDFS 

那麼,爲什麼算的話這種方式,原因是節目的MapReduce的範例是高度並行,從而擴展到這樣做計算TB或甚至數PB的數據。


我不使用python多告訴我,如果我犯了一個錯誤。

2

見直列評論:

file = spark.textFile("hdfs://...") # opens a file 
counts = file.flatMap(lambda line: line.split(" ")) \ # iterate over the lines, split each line by space (into words) 
      .map(lambda word: (word, 1)) \ # for each word, create the tuple (word, 1) 
      .reduceByKey(lambda a, b: a + b) # go over the tuples "by key" (first element) and sum the second elements 
counts.saveAsTextFile("hdfs://...") 

reduceByKey的更詳細的解釋可以發現here

+0

對不起,我不明白reduceByKey。在一個正常的lambda表達式lambda a中,b:a + b表示輸入對(a,b)給我a + b結果不是嗎?但是在這裏它做了其他奇怪的語法? –

+0

要了解reduceBykey,您首先必須瞭解reduce。一個簡單的簡化例子:'print reduce(lambda a,b:a + b,[1,2,3])'迭代一個迭代器並將函數(第一個參數 - 這裏是lambda表達式)應用於前兩個元素然後使用結果與第三個元素等。 – alfasin

+0

我alfasin我重讀你的解釋,我只希望我也可以獎勵點給你too.Your評論清除我的困惑reduceByKey –

1

的答案在這裏是在代碼級別準確,但它可能有助於瞭解引擎蓋下發生的事情。

我的理解是,當調用reduce操作時,會有一個海量數據混洗,導致通過map()操作獲得的所有KV對具有相同的鍵值,並將其分配給總計值的任務在KV對的集合中。然後將這些任務分配給不同的物理處理器,然後將結果與另一個數據洗牌進行比較。

因此,如果在地圖操作產生 (貓1) (貓1) (狗1) (貓1) (貓1) (狗1)

的降低操作產生 (貓4) (狗2)

希望這有助於