我在Apache-Flink api中使用readCsvFile(path)函數讀取CSV文件並將其存儲在列表變量中。它如何使用多線程工作? 例如,它是根據一些統計數據拆分文件?如果是,統計數據是什麼?或者它是逐行讀取文件,然後將這些行發送給線程來處理它們? 下面是示例代碼:Apache-Flink API如何使用底層的並行機制讀取CSV文件?
//default parallelism is 4
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
csvPath="data/weather.csv";
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath)
.types(String.class,Double.class)
.collect();
假設我們有本地磁盤上800MB的CSV文件,它是如何分配的4個線程之間的工作?
謝謝法比安。但我想知道它如何定義拆分?按文件大小?線數或其他?它是否首先閱讀整個文件,然後決定,還是在閱讀之前進行拆分? – Ehsan
對於'CsvInputFormat',文件按大小拆分。在單個線程中讀取文件來分割它是毫無意義的。由於一行可能會跨越兩個拆分,讀取線程會從找到的第一個新行開始,並完成即使跨越拆分邊界時在其拆分中開始的行。 –
好的。假設我們有200MB文件,並行度設置爲2.線程1應該從頭開始。線程2應該從文件中間開始讀取。 thread2如何找出這個位置? Thread1如何知道它已到達其部分的末尾並應該停止? – Ehsan