2016-01-15 30 views
1

我在eclipse中運行flink,必要的jar已被Maven提取。我的機器有一個帶有八個內核的處理器,我必須寫的流應用程序從它的輸入中讀取行並計算一些統計數據。在多核處理器上本地運行Apache flink

當我在我的機器上運行該程序時,我期望flink使用CPU的所有內核以及線程代碼。但是,當我觀察內核時,我發現只有一個內核正在使用。我嘗試了很多東西,並在下面的代碼中留下了我的最後一次嘗試,即設置環境的並行性。我也嘗試將它設置爲單獨的流等等。

public class SemSeMi { 


    public static void main(String[] args) throws Exception { 
     System.out.println("Starting Main!"); 

     System.out.println(org.apache.flink.core.fs.local.LocalFileSystem 
       .getLocalFileSystem().getWorkingDirectory()); 

     StreamExecutionEnvironment env = StreamExecutionEnvironment 
       .getExecutionEnvironment(); 

     env.setParallelism(8); 

     env.socketTextStream("localhost", 9999).flatMap(new SplitterX()); 

     env.execute("Something");  
    } 

    public static class SplitterX implements 
      FlatMapFunction<String, Tuple2<String, Integer>> { 
     @Override 
     public void flatMap(String sentence, 
       Collector<Tuple2<String, Integer>> out) throws Exception { 
      // Do Nothing! 

     } 
    } 
} 

我喂的數據PROGRAMM使用netcat的:

nc -lk 9999 < fileName 

的問題是如何使程序在本地規模和使用所有可用的核心?

回答

2

您不必明確指定並行度。以默認設置運行的作業將自動將並行度設置爲可用核心的數量。

在你的情況下,源代碼將以1的並行性運行,因爲從套接字讀取不能分發。但是,對於flatMap操作,系統將實例化8個實例。如果你打開日誌記錄,那麼你也會看到它。現在輸入數據以循環方式分配給flatMap任務。每個flatMap任務都由一個單獨的線程執行。

我會懷疑你爲什麼只看到一個核心的負載是因爲SplitterX沒有做任何工作。試試下面的代碼計算每String的字符數,然後打印出結果到控制檯:

public static void main(String[] args) throws Exception { 
    System.out.println("Starting Main!"); 

    System.out.println(org.apache.flink.core.fs.local.LocalFileSystem 
     .getLocalFileSystem().getWorkingDirectory()); 

    StreamExecutionEnvironment env = StreamExecutionEnvironment 
     .getExecutionEnvironment(); 

    env.socketTextStream("localhost", 9999).flatMap(new SplitterX()).print(); 

    env.execute("Something"); 
} 

public static class SplitterX implements 
    FlatMapFunction<String, Tuple2<String, Integer>> { 
    @Override 
    public void flatMap(String sentence, 
         Collector<Tuple2<String, Integer>> out) throws Exception { 
     out.collect(Tuple2.of(sentence, sentence.length())); 

    } 
} 

的數字在每一行的開始告訴你哪些任務打印的結果。

+0

我故意清空程序來隔離問題。在我的完整版本中有很多需要處理。我按預期收到任務編號,但整個負載仍在一個核心上。要看到它,你需要通過每一百萬行左右只打印一次來制動輸出! – AHH

相關問題