2017-01-11 62 views
0

我在Apache-Flink api中使用readCsvFile(path)函數讀取CSV文件並將其存儲在列表變量中。它如何使用多線程工作? 例如,它是根據一些統計數據拆分文件?如果是,統計數據是什麼?或者它是逐行讀取文件,然後將這些行發送給線程來處理它們? 下面是示例代碼:Apache-Flink API如何使用底層的並行機制讀取CSV文件?

//default parallelism is 4 
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); 
csvPath="data/weather.csv"; 
List<Tuple2<String, Double>> csv= env.readCsvFile(csvPath) 
         .types(String.class,Double.class) 
         .collect(); 

假設我們有本地磁盤上800MB的CSV文件,它是如何分配的4個線程之間的工作?

回答

1

readCsvFile() API方法內部創建一個數據源,其中CsvInputFormat基於Flink的FileInputFormat。這個InputFormat生成一個所謂的InputSplits列表。 InputSplit定義了文件的哪個範圍應該被掃描。分割然後分配給數據源任務。

因此,每個並行任務都會掃描文件的某個區域並解析其內容。這與MapReduce/Hadoop完成的過程非常相似。

+0

謝謝法比安。但我想知道它如何定義拆分?按文件大小?線數或其他?它是否首先閱讀整個文件,然後決定,還是在閱讀之前進行拆分? – Ehsan

+0

對於'CsvInputFormat',文件按大小拆分。在單個線程中讀取文件來分割它是毫無意義的。由於一行可能會跨越兩個拆分,讀取線程會從找到的第一個新行開始,並完成即使跨越拆分邊界時在其拆分中開始的行。 –

+0

好的。假設我們有200MB文件,並行度設置爲2.線程1應該從頭開始。線程2應該從文件中間開始讀取。 thread2如何找出這個位置? Thread1如何知道它已到達其部分的末尾並應該停止? – Ehsan

1

這是一樣的How does Hadoop process records split across block boundaries?

我提取弗林克釋放-1.1.3 DelimitedInputFormat文件中的一些代碼。

// else .. 
    int toRead; 
    if (this.splitLength > 0) { 
     // if we have more data, read that 
     toRead = this.splitLength > this.readBuffer.length ? this.readBuffer.length : (int) this.splitLength; 
    } 
    else { 
     // if we have exhausted our split, we need to complete the current record, or read one 
     // more across the next split. 
     // the reason is that the next split will skip over the beginning until it finds the first 
     // delimiter, discarding it as an incomplete chunk of data that belongs to the last record in the 
     // previous split. 
     toRead = this.readBuffer.length; 
     this.overLimit = true; 
    } 

很明顯,如果沒有在一個分裂讀行分隔符,它會得到另一個分裂找。(我還沒有找到相應的代碼,我會盡力。)

加:下面的圖片是我如何找到代碼,從readCsvFile()到DelimitedInputFormat。

enter image description here