2014-01-09 49 views
0

我需要讀取的數據文件太大,並且將其加載到列表中需要很長時間。我怎樣才能使用multiproces呢?換句話說,我想將進程文件讀取並加載到列表中。能否請你幫忙。如何讀取文件並加載到多進程列表中

基本上,我有一個數據表,我需要加載到列表,如下所示。讀取文件不需要時間,但將其加載到列表(myList)大約需要1分鐘。這樣,是有可能parallelise此:

def load_file(self, fileName):  
    time_start = time.time() 
    myList = [] 
    # mySet = set() 
    lines = self.read_file(fileName) 
    # time_end = time.time() 
    # print fileName, ": loaded ", round(time_end-time_start, 4)," seconds" 
    for line in lines: 
     content = line.split() 
     myList.append(content) 
    time_end = time.time() 
    print fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds"  
    return myList 

def read_file(self, fileName): 
    filePath = self.data_directory + '\\' + fileName  
    try: 
     with open(filePath, 'r') as f: 
      lines = f.readlines() 
      f.close() 
      return lines 
    except ValueError: 
     print filePath + ' does not exist' 

自組織方式可以是,(假設該文件具有2M行,所以LEN(行)= 2M),加載第一1M至myList1,和第二1M到myList2並行,然後合併它們,myList = myList1 + myList2。但這聽起來不像最佳做法。

+1

您確定您確實需要首先將其加載到列表中嗎?你想達到什麼目的? – bpgergo

+2

你需要提供更多關於你想要完成的細節。通常,您無法通過在多個進程之間分割讀取來更快地從磁盤讀取文件。 –

+1

你想完成什麼?你究竟試過了什麼?你遇到了什麼問題?如果你耗盡內存多進程不會有幫助。如果IO是瓶頸,多進程也無濟於事。事實上並行讀取文件很少是一個好主意。 – pentadecagon

回答

0

我做了一些測試,很有趣,哈哈。我不認爲這是非常有效的:)也許有另一種有效的方法?

import time 
import multiprocessing 

## Generate sample big file (~158Mo, 2M lines) 
import random 
chunks = "Lorem ipsum dolor sit amet consectetur adipisicing elit sed do eiusmod tempor incididunt ut labore et dolore magna aliqua".split() 
with open(r"D:\testbig.txt", "w", encoding="utf-8") as f: 
    for i in range(2000000): 
     for nch in range(random.randrange(5,20)): 
      f.write(random.choice(chunks)) 
      f.write(" ") 
     f.write("\n") 

# Proposed direct way 
fileName = "foo" 
time_start = time.time() 
myList = [] 
# mySet = set() 
with open(r"D:\testbig.txt", "r", encoding="utf-8") as f: 
    lines = f.readlines() 
time_end = time.time() 
print(fileName, ": loaded ", round(time_end-time_start, 4)," seconds") 
for line in lines: 
    content = line.split() 
    myList.append(content) 
time_end = time.time() 
print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds") 
del myList 

# Results: 
## foo : loaded 0.9204 seconds 
## foo : 2000000 rows loaded in 6.9107 seconds 
## Press any key to continue . . . 

# Workers method: 
MAXPROCESS = 7 
CHUNKLEN = 25600000 

# The worker 
def splitter(lines): 
    myList = [] 
    for line in lines: 
     content = line.split() 
     myList.append(content) 
    return myList 

# The code has to be fully loaded, therefore in a function 
def main(): 

    fileName = "foo" 
    time_start = time.time() 
    # Declare a pool of workers 
    pool = multiprocessing.Pool(MAXPROCESS) 
    results = [] 
    with open(r"D:\testbig.txt", "r", encoding="utf-8") as f: 
     while True: 
      # Read an amount of lines (about CHUNKLEN bytes) 
      lines = f.readlines(CHUNKLEN) 
      # End of file breaks the loop 
      if len(lines) == 0: break 
      # Queue data to be processed 
      results.append(pool.apply_async(splitter, (lines,))) 
    time_end = time.time() 
    print(fileName, ": loaded ", round(time_end-time_start, 4)," seconds") 
    # Wait for queue to be processed 
    pool.close() 
    pool.join() 
    # Put list pieces together 
    myList = [] 
    for result in results: 
     myList += result.get() 

    time_end = time.time() 
    print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds") 

main() 

# Results: 

# MAXPROCESS = 4 
# CHUNKLEN = 8192 
## foo : loaded 5.0075 seconds 
## foo : 2000000 rows loaded in 11.0446 seconds 
## Press any key to continue . . . 

# MAXPROCESS = 7 
# CHUNKLEN = 25600 
## foo : loaded 6.0839 seconds 
## foo : 2000000 rows loaded in 9.1102 seconds 
## Press any key to continue . . . 

# MAXPROCESS = 7 
# CHUNKLEN = 25600000 
## foo : loaded 3.1199 seconds 
## foo : 2000000 rows loaded in 11.7622 seconds 
## Press any key to continue . . . 
1

基本上,它在一個大文件上調用file.readlines()從不是一個好主意。我不知道這條線是什麼

self.read_file(fileName) 

但我怕它調用file.readlines()。

通常情況下,您不希望列表中有數百萬行大文件。這會消耗你的記憶。

如果要篩選/轉換大文件的行,然後將結果行寫入其他文件,則使用迭代器而不是在列表中加載行。

我建議嘗試按照這些方法組織您的解決方案。如果大小爲幾千兆字節,這種方法可以輕鬆處理文件。

def split_lines(file): 
    with open(file) as f: 
     for line in f:     
      yield line.split() 

def process_splitted_lines(file): 
    for splitted_line in split_lines(file): 
     <do some other thing with splitted line> 
     yield something 

def write_result_lines(file): 
    for something in process_splitted_lines(file): 
     line = <do some other thing with something> 
     <write line to resultfile> 
相關問題