2012-06-06 40 views
1

所以我運行下面的代碼,當我運行它之後使用queue.qsize()時,隊列中仍然有450,000個條目,這意味着文本文件的大部分行都沒有被讀取。任何想法發生了什麼?Readline and threading

from Queue import Queue 
from threading import Thread 

lines = 660918 #int(str.split(os.popen('wc -l HGDP_FinalReport_Forward.txt').read())[0]) -1 
queue = Queue() 
File = 'HGDP_FinalReport_Forward.txt' 
num_threads =10 
short_file = open(File) 

class worker(Thread): 
    def __init__(self,queue): 
     Thread.__init__(self) 
     self.queue = queue 
    def run(self): 
     while True:   
      try:   
       self.queue.get()  
       i = short_file.readline() 
       self.queue.task_done() #signal to the queue that the task is done 
      except:    
       break 

## This is where I should make the call to the threads 

def main(): 
    for i in range(num_threads): 
     worker(queue).start() 
    queue.join() 


    for i in range(lines): # put the range of the number of lines in the .txt file 
     queue.put(i) 

main() 

回答

1

很難知道你想在這裏做什麼,但如果每條線都可以獨立處理,multiprocessing一個更簡單的選擇,這將照顧所有同步的爲您服務。另外一個好處是你不必提前知道行數。

基本上,

import multiprocessing 
pool = multiprocessing.Pool(10) 

def process(line): 
    return len(line) #or whatever 

with open(path) as lines: 
    results = pool.map(process, lines) 

或者,如果你只是試圖從線獲得某種集合結果,你可以使用reduce降低內存使用情況。

import operator 
with open(path) as lines: 
    result = reduce(operator.add, pool.map(process, lines)) 
+0

雅,這只是我的程序的一個虛擬版本。我不想發佈所有內容並且解決問題。 – user1423020

+0

這兩個都不需要你打開整個正在使用的文件?如果是這種情況,它不適用於我,因爲該文件太大。我一直在記憶錯誤,這就是我使用readline的原因。 – user1423020

+0

不,pool.map是懶惰的,所以它會根據需要從文件生成器中獲取行。您還可以指定塊大小(一次的行數)作爲第三個參數。但是,它會返回一個列表,所以如果結果太大而不適合內存,那麼您也可以使用multiprocessing.Pool.imap,它將返回一個迭代器。一般情況下,如果你想使用第二種解決方案(減少),你會想與imap配對。 The docs:http://docs.python.org/library/multiprocessing.html – gilesc

1

所以我想這樣做,但我得到一個有點困惑,因爲我需要通過每次一行,而不是代碼似乎怎麼做

import multiprocessing as mp 

File = 'HGDP_FinalReport_Forward.txt' 
#short_file = open(File) 
test = [] 

def pro(temp_line): 
    temp_line = temp_line.strip().split() 
    return len(temp_line) 

if __name__ == "__main__": 
    with open("HGDP_FinalReport_Forward.txt") as lines: 
     pool = mp.Pool(processes = 10) 
     t = pool.map(pro,lines)