1
我是Flink的新手。我有這個代碼來映射,組和輸入JSON的總和。Flink keyBy摸索問題
這與詞計數示例非常相似。
我有望獲得(vacant,1) (occupied,2)
但是,由於某種原因,我得到(occupied,1) (vacant,1) (occupied,2)
public static void main(String[] args) throws Exception {
String s = "{\n" +
" \"Port_128\": \"occupied\",\n" +
" \"Port_129\": \"occupied\",\n" +
" \"Port_120\": \"vacant\"\n" +
"\n" +
"}";
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> in = env.fromElements(s);
SingleOutputStreamOperator<Tuple2<String, Integer>> t =
in.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>>
collector) throws Exception {
ObjectMapper mapper = new ObjectMapper();
JsonNode node = mapper.readTree(s);
node.elements().forEachRemaining(v -> {
collector.collect(new Tuple2<>(v.textValue(), 1));
});
}
}).keyBy(0).sum(1);
t.print();
env.execute();
我怎麼能每次阻止它從KeydAggregation以及如何從它'歧字數統計爲例? – MIkCode
您設置了一個Datatream程序。 Flink字數統計的例子是一個DataSet程序。兩者的行爲不同。數據流中的數據按照通過管道接收時的處理方式進行處理,因此它處理每個經過的元素的原因。我將通過更改您的代碼更新答案,即使用DataSet代碼(如wordcount示例)。如果你運行它,你會得到你期望的輸出。 – Jicaar
現在我明白了, 我的錯誤是我使用流而不是數據集 – MIkCode