2017-10-11 70 views
0

我試圖獲得一個輸入數據集(csv)的Map<String, Map<String, Long>>,該輸入數據集具有Map a Map的每個元素(Dataset的列),其中元素出現在相應列中的計數的事件。 因此,有這樣的輸入例如:Java Spark數列的元素頻率

col1,col2,col3 
a,1,c6 
ab,23,c6 
cd,23,c8 
a,1,x 

我的輸出應該是這樣的:

{col1:{a:2, ab:1, cd:1}}, 
{col2:{1:2, 23:2}}, 
{col3:{c6:2, c8:1, x:1}} 

我有辦法做到這一點單獨服用每一列,並使用「countByValue」算元素地圖,然後將每個Map存儲爲列的Map中的值。 現在我想一個辦法把文件讀一次加速計算和我試圖用「flatMapToPair」功能在我的文件:

JavaRDD<String> fileRdd 

這樣的:

JavaPairRDD<String, String> res = fileRdd.flatMapToPair(
    new PairFlatMapFunction<String, String, String>() { 
     public Iterator<Tuple2<String, String>> call(String x) { 
      List<Tuple2<String, String>> res = new ArrayList<>(); 
      List<String> d = Arrays.asList(x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1)); 
      for (int i = 0; i < columns.size(); i++) { 
       res.add(new Tuple2<String, String>(columns.get(i), d.get(i))); 
      } 
      return res.iterator(); 
     } 
}); 

然後groupingByKey:

JavaPairRDD<String,Iterable<String>> groupMap = res.groupByKey(); 

現在我有這樣的結果:

col1:[a,ab,cd,a] 

,我想我需要另一個地圖縮小步數出現,所以這可能不是我達到目標的最佳途徑......

另外,我注意到,關於剛纔的文件的第一個flatMapToPair計算200MB在超過先前計算處理同一個文件的時間後,我的內存耗盡,所以我可能會對flatMapToPair做錯誤處理。

回答

0

如果使用DataFrame而不是RDD,那麼有一個簡單的解決方案。

//import com.fasterxml.jackson.core.JsonGenerator; 
//import com.fasterxml.jackson.core.JsonParseException; 
//import com.fasterxml.jackson.core.JsonProcessingException; 
//import com.fasterxml.jackson.core.type.TypeReference; 
//import com.fasterxml.jackson.databind.JsonMappingException; 
//import com.fasterxml.jackson.databind.ObjectMapper; 

// Read CSV 
Dataset<Row> df = spark.read().csv(fileName); 
// Initialize ObjectMapper 
ObjectMapper mapper = new ObjectMapper(); 
mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false); 

// Map for collecting column information 
Map<String, Map<String,Long>> columnCountMap = new HashMap<String, Map<String,Long>>(); 
for (String columnName : df.columns()) 
    { 
     // Group and count using groupBy function 
     // and then convert to JSON and collect as List 
     List<String> jsons = df.groupBy(columnName).count().toJSON().collectAsList(); 
     try 
      { 
       Map<String,Long> countMap = new HashMap<String, Long>(); 
       // Iterate through the strings/rows; 
       // map it to Map then collect values; 
       // put them into the countMap 
       for (String json : jsons) 
        { 
         Map<String, Object> map = mapper.readValue(json, new TypeReference<Map<String, String>>(){}); 
         String[] keyValues = map.values().toArray(new String[map.values().size()]); 
         countMap.put(keyValues[0], Long.parseLong(keyValues[1])); 

        } 
       columnCountMap.put(columnName, countMap); 
      } 
     catch (JsonParseException e) 
      { 
       e.printStackTrace(); 
      } 
     catch (JsonMappingException e) 
      { 
       e.printStackTrace(); 
      } 
     catch (IOException e) 
      { 
       e.printStackTrace(); 
      } 
    } 
String output = "": 
try 
{ 
     // If you need to output as {col1:{a:2, ab:1, cd:1}}, 
     // {col2:{1:2, 23:2}}, 
     // {col3:{c6:2, c8:1, x:1}} 
     output = mapper.writeValueAsString(columnCountMap); 
} 
catch (JsonProcessingException e) 
{ 
     e.printStackTrace(); 
}