2016-06-19 109 views
3

我有一個Spark Streaming應用程序,它每秒接收幾條JSON消息,每個消息都有一個標識其來源的ID。我可以執行MapPartitionsToPair,從而創建一個JavaPairDStream,其中包含鍵/值對的RDD,每個分區一個鍵值對(因此,如果我收到5個JSON消息,例如,我得到一個帶有5個分區的RDD,每個分區的消息ID都是一個密鑰,而JSON消息本身就是這個值。如何在Spark中按分區對鍵/值進行分組?

我現在想要做的是,我想將具有相同鍵的所有值分組到同一個分區。例如,如果我有3個分區,其中有'a'鍵和2個分區'b',我想創建一個新的RDD,其中包含2個分區而不是5個分區,每個分區包含一個密鑰的所有值,一個用於'a'和一個'b'。

我該如何做到這一點? 這是到目前爲止我的代碼:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]), 
      StorageLevels.MEMORY_AND_DISK_SER); 

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() { 
     @Override 
     public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception { 

      ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>(); 

      while (stringIterator.hasNext()){ 
       String c=stringIterator.next(); 
       if(c==null){ 
        return null; 

       } 

       JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class); 
       String key= retMap.getSid(); 
       Tuple2<String,String> b= new Tuple2<String,String>(key,c); 
       a.add(b); 

       System.out.print(b._1+"_"+b._2); 
       // } 
       //break; 
      } 


      return a; 
     } 
    }); 

//創建一個JavaPairDStream,其中每個分區包含一個密鑰/值對。

我試圖用grouByKey(),但不管是什麼消息的數量是,我總是得到的2

分區號我應該怎麼辦呢? 非常感謝。

+0

你爲什麼想每個分區1元?你想解決什麼問題? – maasg

回答

4

可以使用

groupByKey(Integer numPartitions) 

,並設置numPartitions等於你有不同的鍵的數量。

但是..你將需要知道你有多少個不同的密鑰預先。你有這些信息嗎?可能不會。那麼......你需要做一些額外的(/冗餘)工作。例如。使用

countByKey 

作爲第一步。這比groupByKey快 - 所以至少你不是總處理時間的兩倍。

更新 OP詢問他們爲什麼默認獲得2個分區。

默認groupByKey使用defaultPartitioner()方法

groupByKey(defaultPartitioner(self)) 
  • 從具有最大基數父分區選擇Partitioner

- 或者它會使用spark.default.parallelism

+0

謝謝,這絕對解決了我的問題。然而,只有一個問題:你知道爲什麼'groupByKey()'默認返回2個分區嗎?無論每批處理間隔發送多少個輸入或我擁有的輸出,似乎groupByKey都獨立於此。當我執行'getNumPartitions'時,它返回2 –

相關問題