2017-06-16 96 views
2

我已經爲Apache Flink編寫了一個非常簡單的java程序,現在我有興趣測量統計信息,如吞吐量(每秒處理的元組數量)和延遲(程序需要處理的時間輸入元組)。Apache Flink上的吞吐量和延遲

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

env.readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
     .map(new MyMapper().writeAsCsv("/home/LizardKing/Results.csv"); 

JobExecutionResult res = env.execute(); 

我知道弗林克暴露了一些指標:

https://ci.apache.org/projects/flink/flink-docs-release-1.2/monitoring/metrics.html

但我不知道如何使用它們,以獲得我想要的東西。從鏈接我已經讀過,「米」可以用來衡量平均吞吐量,但是,在定義它之後,我該如何使用它?

+0

你到底在幹什麼?對於吞吐量,您可以在您的「MyMapper」函數中註冊一個「Meter」,就像您提供的鏈接所示。您可以觀看Flink網絡儀表板中的實時指標。 – us2012

+0

如果我按照指示執行myMeter類,我嘗試了一些東西,但它不起作用。如果我使用DropWizard儀表並嘗試在獨立模式下運行它,即使我已將依賴項包含在pom.xml中,我也有一個錯誤(java.lang.NoClassDefFoundError:com/codahale/metrics/Meter )。 – LizardKing

回答

1

我們正在紗線上運行我們的生產流作業中運行自定義指標,如米,計量器。

通過以下步驟:

附加依賴​​

<dependency> 
    <groupId>org.apache.flink</groupId> 
    <artifactId>flink-metrics-dropwizard</artifactId> 
    <version>${flink.version}</version> 
</dependency> 

我們使用1.2.1版

然後加計MyMapper類的pom.xml。

import org.apache.flink.api.common.JobExecutionResult; 
import org.apache.flink.api.common.functions.RichMapFunction; 
import org.apache.flink.configuration.Configuration; 
import org.apache.flink.dropwizard.metrics.DropwizardMeterWrapper; 
import org.apache.flink.metrics.Meter; 
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; 


public class Test { 


    public static void main(String[] args) throws Exception { 

     StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

     env 
       .readTextFile("/home/LizardKing/Documents/Power/Prova.csv") 
       .map(new MyMapper()) 
       .writeAsCsv("/home/LizardKing/Results.csv"); 

     JobExecutionResult res = env.execute(); 
    } 


    private static class MyMapper extends RichMapFunction<String, Object> { 

     private transient Meter meter; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      this.meter = getRuntimeContext() 
        .getMetricGroup() 
        .meter("myMeter", new DropwizardMeterWrapper(new com.codahale.metrics.Meter())); 
     } 

     @Override 
     public Object map(String value) throws Exception {  
      this.meter.markEvent(); 
      return value; 
     } 
    } 
} 

希望這會有所幫助。

+0

它有幫助,我還遇到了另一個問題:當我嘗試在flink(而不是從IDE)中運行此程序時,發現僅將依賴項包含在pom.xml中是不夠的。我不得不提供庫鏈接,我建議的方法是使用maven-shade插件。它應該在上傳的jar中打包依賴。 – LizardKing