我有一個Spark Streaming應用程序,它每秒接收幾條JSON消息,每個消息都有一個標識其來源的ID。我可以執行MapPartitionsToPair
,從而創建一個JavaPairDStream,其中包含鍵/值對的RDD,每個分區一個鍵值對(因此,如果我收到5個JSON消息,例如,我得到一個帶有5個分區的RDD,每個分區的消息ID都是一個密鑰,而JSON消息本身就是這個值。如何在Spark中按分區對鍵/值進行分組?
我現在想要做的是,我想將具有相同鍵的所有值分組到同一個分區。例如,如果我有3個分區,其中有'a'鍵和2個分區'b',我想創建一個新的RDD,其中包含2個分區而不是5個分區,每個分區包含一個密鑰的所有值,一個用於'a'和一個'b'。
我該如何做到這一點? 這是到目前爲止我的代碼:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
//創建一個JavaPairDStream,其中每個分區包含一個密鑰/值對。
我試圖用grouByKey()
,但不管是什麼消息的數量是,我總是得到的2
分區號我應該怎麼辦呢? 非常感謝。
你爲什麼想每個分區1元?你想解決什麼問題? – maasg