2010-11-24 36 views
0

我必須根據其他字符串的一些計算生成一組字符串。這需要相當長的一段時間,而且我正在開發一個多處理器/多核服務器,所以我想我可以將這些任務分解成多個塊並將它們傳遞給不同的進程。Python&Multiprocessing,將一代代分解成子流程

首先,我將第一個字符串列表分解成10000個塊,將它發送給創建一個新集的進程,然後嘗試獲取一個鎖並將它們報告回主進程。但是,我的主進程集是空的!

下面是一些代碼:

def build_feature_labels(self,strings,return_obj,l): 
    feature_labels = set() 
    for s in strings: 
     feature_labels = feature_labels.union(s.get_feature_labels()) 
    print "method: ", len(feature_labels) 
    l.acquire() 
    return_obj.return_feature_labels(feature_labels) 
    l.release() 
    print "Thread Done" 

def return_feature_labels(self,labs): 
    self.feature_labels = self.feature_labels.union(labs) 
    print "length self", len(self.feature_labels) 
    print "length labs", len(labs) 


current_pos = 0 
lock = multiprocessing.Lock() 

while current_pos < len(orig_strings): 
    while len(multiprocessing.active_children()) > threads: 
     print "WHILE: cpu count", str(multiprocessing.cpu_count()) 
      T.sleep(30) 

    print "number of processes", str(len(multiprocessing.active_children())) 
    proc = multiprocessing.Process(target=self.build_feature_labels,args=(orig_strings[current_pos:current_pos+self.MAX_ITEMS],self,lock)) 
    proc.start() 
    current_pos = current_pos + self.MAX_ITEMS 

    while len(multiprocessing.active_children()) > 0: 
     T.sleep(3) 


    print len(self.feature_labels) 

奇怪的是a)在self.feature_labels的主進程是空的,但是當它從每個子進程調用它的項目。我想我在這裏採取了錯誤的方法(這是我用Java做的!)。有更好的方法嗎?

在此先感謝。

回答

1

使用multiprocessing.Pipe, or Queue(或其他此類對象)在進程之間傳遞數據。使用管道在兩個進程之間傳遞數據,並使用隊列來允許多個生產者和消費者。

隨着官方文檔,有很好的例子可以在Doug Hellman's multiprocessing tutorial找到。特別是,它有一個如何使用multiprocessing.Pool來實現mapreduce類型操作的例子。它可能很適合你的目的。

+0

帶有多處理隊列,這是否意味着我跟蹤所有當前隊列(在Python列表中?),在每個子進程內執行諸如queue.put(the_set_I_created)之類的操作,然後在主進程中去通過隊列列表並調用q.get來獲取集合? – Stuart 2010-11-24 20:55:50

0

爲什麼它不工作:多處理使用進程,並且進程內存不共享。多處理可以爲IPC設置共享內存或管道,但必須明確完成。這是各種建議如何將數據發回給主人。