2017-10-19 162 views
1

我是Flink的新手。我有這個代碼來映射,組和輸入JSON的總和。Flink keyBy摸索問題

這與詞計數示例非常相似。

我有望獲得(vacant,1) (occupied,2)

但是,由於某種原因,我得到(occupied,1) (vacant,1) (occupied,2)

public static void main(String[] args) throws Exception { 
     String s = "{\n" + 
       " \"Port_128\": \"occupied\",\n" + 
       " \"Port_129\": \"occupied\",\n" + 
       " \"Port_120\": \"vacant\"\n" + 
       "\n" + 
       "}"; 
     StreamExecutionEnvironment env = 
     StreamExecutionEnvironment.getExecutionEnvironment(); 
     DataStream<String> in = env.fromElements(s); 
     SingleOutputStreamOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
      @Override 
      public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
       ObjectMapper mapper = new ObjectMapper(); 
       JsonNode node = mapper.readTree(s); 
       node.elements().forEachRemaining(v -> { 
        collector.collect(new Tuple2<>(v.textValue(), 1)); 
       }); 

      } 
     }).keyBy(0).sum(1); 

     t.print(); 
     env.execute(); 

回答

1

運行你的代碼,我得到:

10/19/2017 11:27:38 Keyed Aggregation -> Sink: Unnamed(1/1) switched to RUNNING 
(occupied,1) 
(occupied,2) 
(vacant,1) 
10/19/2017 11:28:03 Keyed Aggregation -> Sink: Unnamed(1/1) switched to FINISHED 

這比略有不同的輸出,但重要。原因在於代碼在接收數據時輸出每個鍵的總和,所以首先得到第一個佔用(輸出1),然後第二個(輸出該鍵控過程的總和現在爲2),然後將空置發送給另一個鍵控進程並輸出1.因此,這看起來像是我的正確輸出。

編輯

每下面的評論,這裏是會給你所需要的輸出代碼:

public static void main(String[] args) throws Exception { 
    String s = "{\n" + 
     " \"Port_128\": \"occupied\",\n" + 
     " \"Port_129\": \"occupied\",\n" + 
     " \"Port_120\": \"vacant\"\n" + 
     "\n" + 
     "}"; 
    ExecutionEnvironment env = 
     ExecutionEnvironment.getExecutionEnvironment(); 
    DataSet<String> in = env.fromElements(s); 
    AggregateOperator<Tuple2<String, Integer>> t = 
     in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() { 
     @Override 
     public void flatMap(String s, Collector<Tuple2<String, Integer>> 
      collector) throws Exception { 
      ObjectMapper mapper = new ObjectMapper(); 
      JsonNode node = mapper.readTree(s); 
      node.elements().forEachRemaining(v -> { 
      collector.collect(new Tuple2<>(v.textValue(), 1)); 
      }); 

     } 
     }).groupBy(0).sum(1); 

    t.print(); 
    env.execute(); 
} 
+0

我怎麼能每次阻止它從KeydAggregation以及如何從它'歧字數統計爲例? – MIkCode

+1

您設置了一個Datatream程序。 Flink字數統計的例子是一個DataSet程序。兩者的行爲不同。數據流中的數據按照通過管道接收時的處理方式進行處理,因此它處理每個經過的元素的原因。我將通過更改您的代碼更新答案,即使用DataSet代碼(如wordcount示例)。如果你運行它,你會得到你期望的輸出。 – Jicaar

+0

現在我明白了, 我的錯誤是我使用流而不是數據集 – MIkCode