2012-04-27 75 views
0

我注意到這種行爲在python池分配。即使我在池中有20個進程,但是當我爲8個進程執行map_async時,我不會執行所有進程,而只會執行4個進程。當那4個完成時,它再發送兩個,然後當這兩個完成時發送一個。多處理 - 池分配

當我拋出20多個數據時,它會全部運行20次,直到隊列中開始少於20個時,重複上述行爲。

我認爲這是有意完成的,但看起來很奇怪。我的目標是儘快處理請求,顯然這種行爲不適合。

使用Python 2.6 billiard爲maxtasksperchild支持

任何想法我怎麼能提高呢?

代碼:

mypool = pool.Pool(processes=settings['num-processes'], initializer=StartChild, maxtasksperchild=10) 

while True: 
    lines = DbData.GetAll() 
    if len(lines) > 0: 
     print 'Starting to process: ', len(lines), ' urls' 
     Res = mypool.map_async(RunChild, lines) 
     Returns = Res.get(None) 
     print 'Pool returns: ', idx, Returns 
    else: 
     time.sleep(0.5) 

回答

2

一種方式我處理在Python多是:

我有,我想使用的功能function()數據。
首先我創建了一個多子類:

import multiprocessing 

class ProcessThread(multiprocessing.Process): 
    def __init__(self, id_t, inputqueue, idqueue, function, resultqueue): 
     self.id_t = id_t 
     self.inputlist = inputqueue 
     self.idqueue = idqueue 
     self.function = function 
     self.resultqueue = resultqueue 

     multiprocessing.Process.__init__(self) 

    def run(self): 
     s = "process number: " + str(self.id_t) + " starting" 
     print s 
     result = [] 

     while self.inputqueue.qsize() > 0 
      try: 
       inp = self.inputqueue.get() 
      except Exception: 
       pass 
      result = self.function(inp) 
      while 1: 
       try: 
        self.resultqueue.put([self.id,]) 
       except Exception: 
        pass 
       else: 
        break 
      self.idqueue.put(id) 
      return 

和主要功能:

inputqueue = multiprocessing.Queue() 
resultqueue = multiprocessing.Queue() 
idqueue = multiprocessing.Queue() 

def function(data): 
    print data # or what you want 

for datum in data: 
    inputqueue.put(datum) 

for i in xrange(nbprocess): 
    ProcessThread(i, inputqueue, idqueue, function, resultqueue).start() 

最後得到的結果:

results = [] 
while idqueue.qsize() < nbprocess: 
    pass 
while resultqueue.qsize() > 0: 
    results.append(resultqueue.get()) 

通過這種方式,你可以完全控制哪些附加與過程和其他的東西。 使用多處理inputqueue是一種有效的技術,只有當每個數據的計算相當慢(< 1,2秒)時,由於不同進程併發訪問隊列(爲什麼我使用異常)。如果你的函數計算速度非常快,考慮在開始時只分解一次數據,並把每個進程的數據集塊放在一起。

+0

謝謝,這有助於改進我的腳本。我擺脫了默認池處理,並根據您的示例實施了我自己的處理。 – SorinV 2012-04-30 03:29:16