我正在嘗試運行Flink流式作業。我想確定流式傳輸過程的吞吐量和延遲。我已經開始了卡夫卡經紀人服務器,並從卡夫卡傳入消息。我如何計算每秒消息(吞吐量)? (像rdd.count。有沒有類似的方法來獲取傳入消息的計數)Flink流 - 延遲和吞吐量檢測
(完整的scenerio:我已經通過生產者發送消息作爲Json對象,我添加了一些信息,如名稱作爲字符串和在Json對象中也是System.currentTimeMills 在流式傳輸過程中,如何通過messageStream(DataStream)獲取發送的json對象?)
在此先感謝。
CODE:
/** * 讀字符串卡夫卡並打印到標準輸出。 */
public static void main(String[] args) throws Exception {
System.setProperty("hadoop.home.dir", "c:/winutils/");
// parse input argum ents
final ParameterTool parameterTool = ParameterTool.fromArgs(args);
if(parameterTool.getNumberOfParameters() < 4) {
System.out.println("Missing parameters!\nUsage: Kafka --topic <topic> " +
"--bootstrap.servers <kafka brokers> --zookeeper.connect <zk quorum> --group.id <some id>");
return;
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().disableSysoutLogging();
env.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000));
env.enableCheckpointing(5000); // create a checkpoint every 5 seconds
env.getConfig().setGlobalJobParameters(parameterTool); // make parameters available in the web interface
DataStream<String> messageStream = env
.addSource(new FlinkKafkaConsumer010<>(
parameterTool.getRequired("topic"),
new SimpleStringSchema(),
parameterTool.getProperties()));
messageStream.print();
env.execute();
}