我有一個單獨的大文本文件,我想在其中處理每一行(執行一些操作)並將它們存儲在數據庫中。由於一個簡單的程序花費的時間太長,我希望通過多個進程或線程來完成。 每個線程/進程應該讀取來自單個文件的不同數據(不同的行),並對它們的數據(行)進行一些操作並將它們放入數據庫中,以便最終處理完整個數據並我的數據庫被傾倒了我需要的數據。在python中處理多個進程中的單個文件
但我無法弄清楚如何解決這個問題。
我有一個單獨的大文本文件,我想在其中處理每一行(執行一些操作)並將它們存儲在數據庫中。由於一個簡單的程序花費的時間太長,我希望通過多個進程或線程來完成。 每個線程/進程應該讀取來自單個文件的不同數據(不同的行),並對它們的數據(行)進行一些操作並將它們放入數據庫中,以便最終處理完整個數據並我的數據庫被傾倒了我需要的數據。在python中處理多個進程中的單個文件
但我無法弄清楚如何解決這個問題。
你所尋找的是一個生產者/消費者模式
基本線程例如
下面是一個使用基本例如(而不是多處理)
import threading
import Queue
import sys
def do_work(in_queue, out_queue):
while True:
item = in_queue.get()
# process
result = item
out_queue.put(result)
in_queue.task_done()
if __name__ == "__main__":
work = Queue.Queue()
results = Queue.Queue()
total = 20
# start for workers
for i in xrange(4):
t = threading.Thread(target=do_work, args=(work, results))
t.daemon = True
t.start()
# produce data
for i in xrange(total):
work.put(i)
work.join()
# get the results
for i in xrange(total):
print results.get()
sys.exit()
你止跌與線程共享文件對象。通過向queue提供數據行,您可以爲他們生產。然後,每個線程都會選取並處理它,然後將其返回隊列中。
multiprocessing module中內置了一些更高級的工具來共享數據,如列表和special kind of Queue。使用多處理vs線程有一定的權衡,它取決於你的工作是cpu綁定還是IO綁定。
基本multiprocessing.Pool例如
這裏是一個多池
from multiprocessing import Pool
def process_line(line):
return "FOO: %s" % line
if __name__ == "__main__":
pool = Pool(4)
with open('file.txt') as source_file:
# chunk the work into batches of 4 lines at a time
results = pool.map(process_line, source_file, 4)
print results
A Pool是管理自己的流程便利對象的一個很基本的例子。由於打開的文件可以遍歷其行,因此可以將它傳遞給映射,映射將循環並將行傳遞給工作函數。 Map塊完成後返回整個結果。請注意,在一個非常簡單的例子中,map
將在開始工作之前一次性使用您的文件。所以請注意它是否更大。有更高級的方法來設計製作者/消費者設置。
手冊「池」與極限和線重新排序
這是Pool.map的手動的例子,但代替的消耗整個迭代,可以設置一個隊列的大小,以便你只供給它可以儘可能快地處理它。我還添加了行號,以便您可以跟蹤它們並在以後再使用它們。
from multiprocessing import Process, Manager
import time
import itertools
def do_work(in_queue, out_list):
while True:
item = in_queue.get()
line_no, line = item
# exit signal
if line == None:
return
# fake work
time.sleep(.5)
result = (line_no, line)
out_list.append(result)
if __name__ == "__main__":
num_workers = 4
manager = Manager()
results = manager.list()
work = manager.Queue(num_workers)
# start for workers
pool = []
for i in xrange(num_workers):
p = Process(target=do_work, args=(work, results))
p.start()
pool.append(p)
# produce data
with open("source.txt") as f:
iters = itertools.chain(f, (None,)*num_workers)
for num_and_line in enumerate(iters):
work.put(num_and_line)
for p in pool:
p.join()
# get the results
# example: [(1, "foo"), (10, "bar"), (0, "start")]
print sorted(results)
是的,該文件較大,大約1 GB左右。我不知道你的意思是說更大,1 GB對我來說更大。 – pranavk
這很好。我確定你可以採取這些例子並推斷你的需求。線程一個就好了。多處理器只需要一個類似的隊列供您使用。 – jdi
這很好,但如果處理是I/O限制呢?在這種情況下,並行可能會減慢速度,而不是加快速度。在單個磁盤軌道內搜索比intertrack尋求的要快得多,並行I/O往往會引入intertrack尋求否則會是順序I/O負載。爲了從並行I/O中獲得一些好處,有時使用RAID鏡像有時會有所幫助。 – user1277476
下面是我做了一個非常愚蠢的例子:
import os.path
import multiprocessing
def newlinebefore(f,n):
f.seek(n)
c=f.read(1)
while c!='\n' and n > 0:
n-=1
f.seek(n)
c=f.read(1)
f.seek(n)
return n
filename='gpdata.dat' #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)
#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)
#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.
with open(filename,'r') as f:
start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))
end_byte=[i-1 for i in start_byte] [1:] + [None]
def process_piece(filename,start,end):
with open(filename,'r') as f:
f.seek(start+1)
if(end is None):
text=f.read()
else:
nbytes=end-start+1
text=f.read(nbytes)
# process text here. createing some object to be returned
# You could wrap text into a StringIO object if you want to be able to
# read from it the way you would a file.
returnobj=text
return returnobj
def wrapper(args):
return process_piece(*args)
filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)
pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)
#Now take your results and write them to the database.
print "".join(result) #I just print it to make sure I get my file back ...
這裏最棘手的部分是確保我們分裂的換行符的文件,這樣你就不會錯過任何線(或只讀取部分行)。然後,每個進程讀取它是文件的一部分,並返回一個可由主線程放入數據庫的對象。當然,你甚至可能需要以塊的形式完成這部分,這樣你就不必一次把所有的信息保存在內存中。 (這很容易實現 - 只需將「參數」列表拆分爲X塊並撥打pool.map(wrapper,chunk)
- 請參閱here)
不錯的問題。我也有這個疑問。雖然我選擇將文件分成更小的文件:) –