0

我正在嘗試運行Flink流式作業。我想確定流式傳輸過程的吞吐量和延遲。我已經開始了卡夫卡經紀人服務器,並從卡夫卡傳入消息。我如何計算每秒消息(吞吐量)? (像rdd.count。有沒有類似的方法來獲取傳入消息的計數)Flink流 - 延遲和吞吐量檢測

(完整的scenerio:我已經通過生產者發送消息作爲Json對象,我添加了一些信息,如名稱作爲字符串和在Json對象中也是System.currentTimeMills 在流式傳輸過程中,如何通過messageStream(DataStream)獲取發送的json對象?)

在此先感謝。

CODE:

/** * 讀字符串卡夫卡並打印到標準輸出。 */

public static void main(String[] args) throws Exception { 
    System.setProperty("hadoop.home.dir", "c:/winutils/"); 
    // parse input argum ents 
    final ParameterTool parameterTool = ParameterTool.fromArgs(args); 

    if(parameterTool.getNumberOfParameters() < 4) { 
     System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " + 
       "--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>"); 
     return; 
    } 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 
    env.getConfig().disableSysoutLogging(); 
    env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)); 
    env.enableCheckpointing(5000); // create a checkpoint every 5 seconds 
    env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface 

    DataStream<String> messageStream = env 
      .addSource(new FlinkKafkaConsumer010<>(
        parameterTool.getRequired("topic"), 
        new SimpleStringSchema(), 
        parameterTool.getProperties())); 


    messageStream.print(); 

    env.execute(); 
} 

回答

0

在Flink用戶界面中有幾個可用的指標,您可以計算每秒鐘發生的事件數量等等。

您還可以根據您的要求在計算某些數字的位置添加自己的指標,這可以顯示在Flink UI中。

而且最後的具體延遲跟蹤也許你可以試試怎麼在這裏解釋 - latency-tracking,同樣可以使用獲得的吞吐量 - meters