我有以下數據的文件:合併數據,爪哇
1231212名1名2
1431344名1 NAME3
2342343 NAME3 NAME4
2344255 name2 name1
and I would l ike我的Java程序在Spark中執行一些操作,以便我的輸出如下:
[(name1,3),(name2,2),(name3,2),(name4,1)]
但我仍不確定如何使用flat和reduce操作。我剛開始學習Spark。
我現在所擁有的是以下幾點:
List<Tuple2<String,Long>> result1 =
accessLogs.map(log -> new Tuple2<Long, String>(log.getTimestamp(), log.getHostname1()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)
.map(e -> (new Tuple2<String, Long>(e._2, 1L)))
.take(100);
List<Tuple2<String, Long>> result2 =
accessLogs.map(log -> new Tuple2<Long, String>(log.getTimestamp(), log.getHostname2()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)
.map(e -> (new Tuple2<String, Long>(e._2, 1L)))
.take(100);
所以我的結果與以下數據兩種不同的列表:
[(name1,1),(name1,1) ,(name3,1),(name2,1)]
[(name2,1),(name3,1),(name4,1),(name1,1)]
通過使用一個列表,我可以使用什麼來實現想要的結果?
我的想法是這樣開始:
List<String> finalResult =
accessLogs.map(log -> new Tuple3<Long, String, String>(log.getTimestamp(), log.getHostname1(), log.getHostname2()))
.filter(tuple -> tuple._1() > init_time - 5)
.filter(tuple -> tuple._1() < fin_time + 5)...
,然後繼續執行操作。
編輯:
現在我有以下代碼:
JavaPairRDD<String, Integer> pairs1 = accessLogs.mapToPair(new PairFunction<LogObject, String, Integer>() {
public Tuple2<String, Integer> call(LogObject s) { return new Tuple2<String, Integer>(s.getHostname1(), 1); }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});
返回
[(name1,2),(name3,1),(name2,1) ]
但我仍然缺少關於如何執行此操作的部分(來自建議的答案)
.flatMap {情況下(_,KEY1,KEY2)=>列表((KEY1,1),(KEY2,1))}
在Java中,這樣我可以檢索來自數據第二和第三列。
這是可悲的是更長的時間在Java中,但我可以使用flatMap然後reduceByKey來完成它。謝謝! – randombee