2016-01-03 113 views
0

我在Cloudera QuickStart VM中使用Spark控制檯。如何減少密鑰?

下面提供了一個輸出文件。它顯示前20條記錄。每條記錄都是電視頻道名稱和其相應觀衆人數的組合。有幾百個記錄。

目標是將此RDD(channel_views)按電視頻道名稱分組,以便每條記錄都是電視頻道名稱的唯一顯示以及其相應的觀看次數總和。

channel_views = joined_dataset.map(extract_channel_views) 

下面是代碼集的我與掙扎,以產生上述

def some_function(a,b): 
    some_result = a + b 
    return some_result 

channel_views.reduceByKey(some_function).collect() 

輸出的下面代碼來描述所期望的輸出/目標:

channel_views.take(20) 

[(1038, u'DEF'), 
(1038, u'CNO'), 
(1038, u'CNO'), 
(1038, u'NOX'), 
(1038, u'MAN'), 
(1038, u'MAN'), 
(1038, u'XYZ'), 
(1038, u'BAT'), 
(1038, u'CAB'), 
(1038, u'DEF'), 
(415, u'DEF'), 
(415, u'CNO'), 
(415, u'CNO'), 
(415, u'NOX'), 
(415, u'MAN'), 
(415, u'MAN'), 
(415, u'XYZ'), 
(415, u'BAT'), 
(415, u'CAB'), 
(415, u'DEF')] 

回答

4

您正在關閉數據集的倒退。使用map(或更改您的提取物)的元組交換從(count,name)(name, count)

byKey方法使用的第一個項目從數組作爲重點,所以你的代碼將Concat的字符串,摳像算不算是。

2

我不知道python,所以我在Scala中做了這個。你可以轉換成python。所以,在這裏你去

scala> val input = sc.parallelize(Seq((1038, "DEF"), 
    | (1038, "CNO"), 
    | (1038, "CNO"), 
    | (1038, "NOX"), 
    | (1038, "MAN"), 
    | (1038, "MAN"), 
    | (1038, "XYZ"), 
    | (1038, "BAT"), 
    | (1038, "CAB"), 
    | (1038, "DEF"), 
    | (415, "DEF"), 
    | (415, "CNO"), 
    | (415, "CNO"), 
    | (415, "NOX"), 
    | (415, "MAN"), 
    | (415, "MAN"), 
    | (415, "XYZ"), 
    | (415, "BAT"), 
    | (415, "CAB"), 
    | (415, "DEF")) 
    |) 
input: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[12] at parallelize at <console>:22 

scala> val data = input.map(v => (v._2,v._1)) 
data: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[13] at map at <console>:24 

scala> data.foreach(println) 
(BAT,1038) 
(DEF,415) 
(CNO,415) 
(BAT,415) 
(CAB,415) 
(DEF,415) 
(MAN,1038) 
(XYZ,1038) 
(CNO,1038) 
(NOX,1038) 
(DEF,1038) 
(MAN,1038) 
(CNO,415) 
(MAN,415) 
(CAB,1038) 
(XYZ,415) 
(NOX,415) 
(CNO,1038) 
(MAN,415) 
(DEF,1038) 

scala> val result = data.reduceByKey((x,y) => x+y) 
result: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[14] at reduceByKey at <console>:26 

scala> result.foreach(println) 
(NOX,1453) 
(MAN,2906) 
(CNO,2906) 
(CAB,1453) 
(DEF,2906) 
(BAT,1453) 
(XYZ,1453) 

scala> 
0

這裏是pyspark代碼:

for i in channel_views.map(lambda rec: (rec[0], rec[1])).reduceByKey(lambda acc, value: acc+value): print(i)