2012-06-25 32 views
55

我有一個單獨的大文本文件,我想在其中處理每一行(執行一​​些操作)並將它們存儲在數據庫中。由於一個簡單的程序花費的時間太長,我希望通過多個進程或線程來完成。 每個線程/進程應該讀取來自單個文件的不同數據(不同的行),並對它們的數據(行)進行一些操作並將它們放入數據庫中,以便最終處理完整個數據並我的數據庫被傾倒了我需要的數據。在python中處理多個進程中的單個文件

但我無法弄清楚如何解決這個問題。

+2

不錯的問題。我也有這個疑問。雖然我選擇將文件分成更小的文件:) –

回答

70

你所尋找的是一個生產者/消費者模式

基本線程例如

下面是一個使用基本例如(而不是多處理)

import threading 
import Queue 
import sys 

def do_work(in_queue, out_queue): 
    while True: 
     item = in_queue.get() 
     # process 
     result = item 
     out_queue.put(result) 
     in_queue.task_done() 

if __name__ == "__main__": 
    work = Queue.Queue() 
    results = Queue.Queue() 
    total = 20 

    # start for workers 
    for i in xrange(4): 
     t = threading.Thread(target=do_work, args=(work, results)) 
     t.daemon = True 
     t.start() 

    # produce data 
    for i in xrange(total): 
     work.put(i) 

    work.join() 

    # get the results 
    for i in xrange(total): 
     print results.get() 

    sys.exit() 

你止跌與線程共享文件對象。通過向queue提供數據行,您可以爲他們生產。然後,每個線程都會選取並處理它,然後將其返回隊列中。

multiprocessing module中內置了一些更高級的工具來共享數據,如列表和special kind of Queue。使用多處理vs線程有一定的權衡,它取決於你的工作是cpu綁定還是IO綁定。

基本multiprocessing.Pool例如

這裏是一個多池

from multiprocessing import Pool 

def process_line(line): 
    return "FOO: %s" % line 

if __name__ == "__main__": 
    pool = Pool(4) 
    with open('file.txt') as source_file: 
     # chunk the work into batches of 4 lines at a time 
     results = pool.map(process_line, source_file, 4) 

    print results 

A Pool是管理自己的流程便利對象的一個​​很基本的例子。由於打開的文件可以遍歷其行,因此可以將它傳遞給映射,映射將循環並將行傳遞給工作函數。 Map塊完成後返回整個結果。請注意,在一個非常簡單的例子中,map將在開始工作之前一次性使用您的文件。所以請注意它是否更大。有更高級的方法來設計製作者/消費者設置。

手冊「池」與極限和線重新排序

這是Pool.map的手動的例子,但代替的消耗整個迭代,可以設置一個隊列的大小,以便你只供給它可以儘可能快地處理它。我還添加了行號,以便您可以跟蹤它們並在以後再使用它們。

from multiprocessing import Process, Manager 
import time 
import itertools 

def do_work(in_queue, out_list): 
    while True: 
     item = in_queue.get() 
     line_no, line = item 

     # exit signal 
     if line == None: 
      return 

     # fake work 
     time.sleep(.5) 
     result = (line_no, line) 

     out_list.append(result) 


if __name__ == "__main__": 
    num_workers = 4 

    manager = Manager() 
    results = manager.list() 
    work = manager.Queue(num_workers) 

    # start for workers  
    pool = [] 
    for i in xrange(num_workers): 
     p = Process(target=do_work, args=(work, results)) 
     p.start() 
     pool.append(p) 

    # produce data 
    with open("source.txt") as f: 
     iters = itertools.chain(f, (None,)*num_workers) 
     for num_and_line in enumerate(iters): 
      work.put(num_and_line) 

    for p in pool: 
     p.join() 

    # get the results 
    # example: [(1, "foo"), (10, "bar"), (0, "start")] 
    print sorted(results) 
+0

是的,該文件較大,大約1 GB左右。我不知道你的意思是說更大,1 GB對我來說更大。 – pranavk

+0

這很好。我確定你可以採取這些例子並推斷你的需求。線程一個就好了。多處理器只需要一個類似的隊列供您使用。 – jdi

+1

這很好,但如果處理是I/O限制呢?在這種情況下,並行可能會減慢速度,而不是加快速度。在單個磁盤軌道內搜索比intertrack尋求的要快得多,並行I/O往往會引入intertrack尋求否則會是順序I/O負載。爲了從並行I/O中獲得一些好處,有時使用RAID鏡像有時會有所幫助。 – user1277476

-4

將單個大文件分解爲多個較小的文件,並將它們中的每一個在單獨的線程中處理。

+4

可以顯示一些代碼嗎? – maq

+0

這不是OP想要的!但只是一個想法...不錯。 – DRPK

5

下面是我做了一個非常愚蠢的例子:

import os.path 
import multiprocessing 

def newlinebefore(f,n): 
    f.seek(n) 
    c=f.read(1) 
    while c!='\n' and n > 0: 
     n-=1 
     f.seek(n) 
     c=f.read(1) 

    f.seek(n) 
    return n 

filename='gpdata.dat' #your filename goes here. 
fsize=os.path.getsize(filename) #size of file (in bytes) 

#break the file into 20 chunks for processing. 
nchunks=20 
initial_chunks=range(1,fsize,fsize/nchunks) 

#You could also do something like: 
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. 


with open(filename,'r') as f: 
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) 

end_byte=[i-1 for i in start_byte] [1:] + [None] 

def process_piece(filename,start,end): 
    with open(filename,'r') as f: 
     f.seek(start+1) 
     if(end is None): 
      text=f.read() 
     else: 
      nbytes=end-start+1 
      text=f.read(nbytes) 

    # process text here. createing some object to be returned 
    # You could wrap text into a StringIO object if you want to be able to 
    # read from it the way you would a file. 

    returnobj=text 
    return returnobj 

def wrapper(args): 
    return process_piece(*args) 

filename_repeated=[filename]*len(start_byte) 
args=zip(filename_repeated,start_byte,end_byte) 

pool=multiprocessing.Pool(4) 
result=pool.map(wrapper,args) 

#Now take your results and write them to the database. 
print "".join(result) #I just print it to make sure I get my file back ... 

這裏最棘手的部分是確保我們分裂的換行符的文件,這樣你就不會錯過任何線(或只讀取部分行)。然後,每個進程讀取它是文件的一部分,並返回一個可由主線程放入數據庫的對象。當然,你甚至可能需要以塊的形式完成這部分,這樣你就不必一次把所有的信息保存在內存中。 (這很容易實現 - 只需將「參數」列表拆分爲X塊並撥打pool.map(wrapper,chunk) - 請參閱here

相關問題