2017-02-09 29 views
2

我在二進制文件中有數百GB的數據。我想隨機抽取數據,多次隨機讀取幾個連續記錄。從Python中並行批量讀取文件

數據存儲在許多文件中。主文件不以任何特定順序存儲數據,因此每個文件都有一個排序索引文件。我目前的代碼是這樣的,除了有很多文件:

index = open("foo.index", 'rb') 
data = open("foo", 'rb') 
index_offset_format = 'Q' 
index_offset_size = struct.calcsize(index_offset_format) 
record_set = [] 
for _ in range(n_batches): 
    # Read `batch_size` offsets from the index - these are consecutive, 
    # so they can be read in one operation 
    index_offset_start = random.randint(0, N_RECORDS - batch_size) 
    index.seek(index_offset_start) 
    data_offsets = struct.iter_unpack(
     index_offset_format, 
     index.read(index_offset_size * batch_size)) 

    # Read actual records from data file. These are not consecutive 
    records = [] 
    for offset in data_offsets: 
     data.seek(offset) 
     records.append(data.read(RECORD_SIZE)) 
    record_set.append(records) 

然後其他事情都做了記錄。從性能分析來看,我發現該程序的IO限制很大,大部分時間都用於index.readdata.read。我懷疑這是因爲read被阻塞:在請求下一個隨機數據塊之前,解釋器等待操作系統從磁盤讀取數據,因此操作系統沒有機會優化磁盤訪問模式。所以:是否有一些文件API可以傳遞一批指令?例如:

def read_many(file, offsets, lengths): 
    ''' 
    @param file: the file to read from 
    @param offsets: the offsets to seek to 
    @param lengths: the lengths of data to read 
    @return an iterable over the file contents at the requested offsets 
    ''' 

或者,打開多個文件對象並使用多線程請求多次讀取是否足夠?或者GIL會阻止它變得有用嗎?

+1

相關:https://stackoverflow.com/questions/29270818/why-is-a-python-i-o-bound-task-not-blocked-by-the-gil。 – ekhumoro

+0

文件的最小值,最大值和平均值是多少? – Apalala

回答

3

因爲進程是IO綁定的,所以讀取的邊界由操作系統的磁盤操作調度器和磁盤的緩存設置。

實際,每個內核的並行化可以很容易地與multiprocessing.Pool.imap_unordered()有:

def pmap(fun, tasks): 
    from multiprocessing import Pool 
    with Pool() as pool: 
     yield from pool.imap_unordered(fun, tasks) 

for record_set in pmap(process_one_file, filenames): 
    print(record_set) 

有幾個文件在同一時間打開,並且可能是read()由每個內核執行,應該允許磁盤調度圖列出一個比文件名列表中的序列更好的時間表。

imap_unordered()的美妙之處在於,它可以將任務的後處理與另一個任務完成的前後處理(順序在不同運行中的順序可能不同)解耦。

正如在評論中提到的,GIL僅在執行Python代碼時才涉及,而I/O上的程序阻塞則不是這種情況。