2014-10-28 81 views
0

我需要按特定列對一組csv行進行分組,並對每個組進行一些處理。使用分組處理火花數據​​

JavaRDD<String> lines = sc.textFile 
         ("somefile.csv"); 
       JavaPairRDD<String, String> pairRDD = lines.mapToPair(new SomeParser()); 
       List<String> keys = pairRDD.keys().distinct().collect(); 
       for (String key : keys) 
       { 
       List<String> rows = pairRDD.lookup(key); 

      noOfVisits = rows.size(); 
      country = COMMA.split(rows.get(0))[6]; 
      accessDuration = getAccessDuration(rows,timeFormat); 
      Map<String,Integer> counts = getCounts(rows); 
      whitepapers = counts.get("whitepapers"); 
      tutorials = counts.get("tutorials"); 
      workshops = counts.get("workshops"); 
      casestudies = counts.get("casestudies"); 
      productPages = counts.get("productpages");   
      } 

    private static long dateParser(String dateString) throws ParseException { 
     SimpleDateFormat format = new SimpleDateFormat("MMM dd yyyy HH:mma"); 
     Date date = format.parse(dateString); 
     return date.getTime(); 
    } 
dateParser is called for each row. Then min and max for the group is calculated to get the access duration. Others are string matches. 

pairRDD.lookup是非常緩慢..有沒有更好的方法來做到這一點火花。

回答

2

我想你可以簡單地使用該列作爲關鍵,並做一個groupByKey。沒有提及這些行上的操作。如果它是以某種方式組合這些行的功能,則甚至可以使用reduceByKey

喜歡的東西:

import org.apache.spark.SparkContext._ // implicit pair functions 
val pairs = lines.map(parser _) 
val grouped = pairs.groupByKey 
// here grouped is of the form: (key, Iterator[String]) 

*編輯* 在看的過程中,我認爲這將是更有效的每一行映射到它所產生的作用的數據,然後使用aggregateByKey,以減少他們的所有總數。 aggregateByKey需要2層的功能和零:

def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, 
     combOp: (U, U) => U): RDD[(K, U)] 

第一功能是一個分區聚合器,並且將有效地通過本地分區中運行,創建每個分區本地彙總諧音。 combineOperation將採用這些部分聚合並將它們組合在一起以獲得最終結果。

事情是這樣的:

val lines = sc.textFile("somefile.csv") 
// parse returns a key and a decomposed Record of values tracked:(key, Record("country", timestamp,"whitepaper",...)) 

val records = lines.map(parse(_)) 

val totals = records.aggregateByKey((0,Set[String].empty,Long.MaxValue, Long.MinValue, Map[String,Int].empty), 
(record, (count, countrySet, minTime, maxTime, counterMap)) => (count+1,countrySet + record.country, math.min(minTime,record.timestamp), math.max(maxTime, record.timestamp), ...) 
(cumm1, cumm2) => ??? // add each field of the cummulator 
) 

這是Spark做基於密鑰的聚合最有效的方法。

+0

我已經試過..它甚至更慢..一個操作是解析每個組的日期列以計算持續時間。 – lochi 2014-10-28 15:40:33

+0

您可以在每個鍵的值上執行操作的問題上添加詳細信息嗎? 'reduceByKey'比'groupByKey'更高效,可能是更好的選擇。 – maasg 2014-10-28 15:42:53

+0

見上面..謝謝。 – lochi 2014-10-28 15:52:51