我在eclipse中運行flink,必要的jar已被Maven提取。我的機器有一個帶有八個內核的處理器,我必須寫的流應用程序從它的輸入中讀取行並計算一些統計數據。在多核處理器上本地運行Apache flink
當我在我的機器上運行該程序時,我期望flink使用CPU的所有內核以及線程代碼。但是,當我觀察內核時,我發現只有一個內核正在使用。我嘗試了很多東西,並在下面的代碼中留下了我的最後一次嘗試,即設置環境的並行性。我也嘗試將它設置爲單獨的流等等。
public class SemSeMi {
public static void main(String[] args) throws Exception {
System.out.println("Starting Main!");
System.out.println(org.apache.flink.core.fs.local.LocalFileSystem
.getLocalFileSystem().getWorkingDirectory());
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(8);
env.socketTextStream("localhost", 9999).flatMap(new SplitterX());
env.execute("Something");
}
public static class SplitterX implements
FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence,
Collector<Tuple2<String, Integer>> out) throws Exception {
// Do Nothing!
}
}
}
我喂的數據PROGRAMM使用netcat的:
nc -lk 9999 < fileName
的問題是如何使程序在本地規模和使用所有可用的核心?
我故意清空程序來隔離問題。在我的完整版本中有很多需要處理。我按預期收到任務編號,但整個負載仍在一個核心上。要看到它,你需要通過每一百萬行左右只打印一次來制動輸出! – AHH