2016-11-18 103 views
0

我想將我的應用程序從flink流處理轉換爲flink批處理。如何做dataBatch的第一個元組字段元素的keyBy

對於弗林克數據流,我讀從具有多個JSON對象預先定義的文件串並做從Json的一個flatmap對象到tuple3集電極(第一元件 - 從JSON對象一個場,第二元件 - 從JSON另一個fieled對象,第三個元素 - 實際的json對象數據)。

DataStream<Tuple3<String, Integer, ObjectNode>> transformedSource = source.flatMap(new FlatMapFunction<String, Tuple3<String, Integer, ObjectNode>>() { 
       @Override 
       public void flatMap(String value, Collector<Tuple3<String, Integer, ObjectNode>> out) throws Exception { 
        ObjectNode record = mapper.readValue(value, ObjectNode.class); 
        JsonNode customer = record.get("customer"); 
        JsonNode deviceId = record.get("id"); 
             if (customer != null && deviceId != null) { 
         out.collect(Tuple3.of(customer.asText(), deviceId.asInt(), record)); 
        } 
       } 
      }); 

然後,在窗口中做一個keyBy的第一個和元組元素。

WindowedStream<Tuple3<String, Integer,ObjectNode>, Tuple, TimeWindow> combinedData = transformedSource 
      .keyBy(0, 1) 
      .timeWindow(Time.seconds(5)); 

對於弗林克批量處理,該怎麼辦數據集料的KeyBy,有KeyBy在DataSet中的等效方法

回答

2

groupBy似乎是你要找的

方法
+0

groupBy與ReduceFunction做了詭計,謝謝!同樣,DataSet批次中是否有等價的摺疊方法? – flinkexplorer

相關問題