2016-02-29 30 views
0

我想爲4個異步方法使用4個進程。Python3池異步進程|工人

這是我的1種異步方法(x)的代碼:根據

from multiprocessing import Pool 
import time 

def x(i): 
    while(i < 100): 
      print(i) 
      i += 1 
      time.sleep(1) 

def finish(str): 
    print("done!") 

if __name__ == "__main__": 
    pool = Pool(processes=5) 
    result = pool.apply_async(x, [0], callback=finish) 

print("start") 

https://docs.python.org/2/library/multiprocessing.html#multiprocessing.JoinableQueue 在池的參數的過程是工人的數量。

我該如何使用這些工作人員?

編輯:我ASYNC類

from multiprocessing import Pool 
import time 

class ASYNC(object): 
    def __init__(self, THREADS=[]): 
     print('do') 
     pool = Pool(processes=len(THREADS)) 
     self.THREAD_POOL = {} 
     thread_index = 0 
     for thread_ in THREADS: 
      self.THREAD_POOL[thread_index] = { 
       'thread': thread_['thread'], 
       'args': thread_['args'], 
       'callback': thread_['callback'] 
      } 
      pool.apply_async(self.run, [thread_index], callback=thread_['callback']) 
      self.THREAD_POOL[thread_index]['running'] = True 
      thread_index += 1 
    def run(self, thread_index): 
     print('enter') 
     while(self.THREAD_POOL[thread_index]['running']): 
      print("loop") 
      self.THREAD_POOL[thread_index]['thread'](self.THREAD_POOL[thread_index]) 
      time.sleep(1) 
     self.THREAD_POOL[thread_index]['running'] = False 
    def wait_for_finish(self): 
     for pool in self.THREAD_POOL: 
      while(self.THREAD_POOL[pool]['running']): 
       time.sleep(1) 
def x(pool): 
    print(str(pool)) 
    pool['args'][0] += 1 


def y(str): 
    print("done") 

A = ASYNC([{'thread': x, 'args':[10], 'callback':y}]) 

print("start") 
A.wait_for_finish() 
+0

apply_async是否適用於下一個可用的進程? –

回答

0

multiprocessing.Pool被設計爲工作分配給工人池的一種方便的方式,而不必擔心哪一個工作做哪些工作。它的大小之所以能夠讓你懶散地將工作分派到隊列中,以及限制創建子進程的昂貴(相對)的開銷。

所以你的問題的答案原則上你不應該能夠訪問池中的個別工作人員。如果您希望能夠單獨解決的工人,你將需要實現自己的工作分配系統,並使用multiprocessing.Process,是這樣的:

from multiprocessing import Process 

def x(i): 
    while(i < 100): 
     print(i) 
     i += 1 

pools = [Process(target=x, args=(1,)) for _ in range(5)] 
map(lambda pool: pool.start(), pools) 
map(lambda pool: pool.join(), pools) 
print('Done!') 

現在你可以直接訪問每一個工人。如果您希望能夠在每個工作人員正在運行時動態地發送工作(不僅僅是像我在我的例子中那樣做一件事),那麼您必須自己實施,有可能使用multiprocessing.Queue。看看multiprocessing的代碼,看看如何將工作分配給其工作人員以瞭解如何執行此操作。

爲什麼你想這樣做呢?如果只是擔心工人是否有效地安排工作,那麼我的建議就是相信multiprocessing爲您解決問題,除非您有充分的證據表明您的情況並非因爲某種原因。

+0

我不認爲這就是我要找的 –

+0

好的,你能詳細解釋一下你在找什麼嗎?只是你想知道每次調用'x()'的執行結束了嗎? – daphtdazz