2016-05-15 59 views
1

我對多處理相當陌生,而且我寫了下面的腳本,但方法沒有被調用。我不明白我錯過了什麼。Python中沒有調用工作函數的多處理

我想要做的是:

  1. 呼叫的兩個不同的異步方法。
  2. 先調用另一種方法。

    # import all necessary modules 
        import Queue 
        import logging 
        import multiprocessing 
        import time, sys 
        import signal 
    
        debug = True 
    
        def init_worker(): 
         signal.signal(signal.SIGINT, signal.SIG_IGN) 
    
        research_name_id = {} 
        ids = [55, 125, 428, 429, 430, 895, 572, 126, 833, 502, 404] 
        # declare all the static variables 
        num_threads = 2 # number of parallel threads 
    
        minDelay = 3 # minimum delay 
        maxDelay = 7 # maximum delay 
    
        # declare an empty queue which will hold the publication ids 
        queue = Queue.Queue(0) 
    
    
        proxies = [] 
        #print (proxies) 
    
        def split(a, n): 
         """Function to split data evenly among threads""" 
         k, m = len(a)/n, len(a) % n 
         return (a[i * k + min(i, m):(i + 1) * k + min(i + 1, m)] 
           for i in xrange(n)) 
        def run_worker(
          i, 
          data, 
          queue, 
          research_name_id, 
          proxies, 
          debug, 
          minDelay, 
          maxDelay): 
         """ Function to pull out all publication links from nist 
         data - research ids pulled using a different script 
         queue - add the publication urls to the list 
         research_name_id - dictionary with research id as key and name as value 
         proxies - scraped proxies 
         """ 
         print 'getLinks', i 
         for d in data: 
          print d 
          queue.put(d) 
    
    
    
    
        def fun_worker(i, queue, proxies, debug, minDelay, maxDelay): 
         print 'publicationData', i 
         try: 
          print queue.pop() 
         except: 
          pass 
    
    
    
    
        def main(): 
         print "Initializing workers" 
         pool = multiprocessing.Pool(num_threads, init_worker) 
         distributed_ids = list(split(list(ids), num_threads)) 
         for i in range(num_threads): 
          data_thread = distributed_ids[i] 
          print data_thread 
          pool.apply_async(run_worker, args=(i + 1, 
            data_thread, 
            queue, 
            research_name_id, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
          pool.apply_async(fun_worker, 
           args=(
            i + 1, 
            queue, 
            proxies, 
            debug, 
            minDelay, 
            maxDelay, 
           )) 
    
         try: 
          print "Waiting 10 seconds" 
          time.sleep(10) 
    
         except KeyboardInterrupt: 
          print "Caught KeyboardInterrupt, terminating workers" 
          pool.terminate() 
          pool.join() 
    
         else: 
          print "Quitting normally" 
          pool.close() 
          pool.join() 
    
        if __name__ == "__main__": 
         main() 
    

,我得到的唯一輸出是

 Initializing workers 
     [55, 125, 428, 429, 430, 895] 
     [572, 126, 833, 502, 404] 
     Waiting 10 seconds 
     Quitting normally 

回答

0

有幾個問題:

  1. 您沒有使用multiprocessing.Queue
  2. 如果你想通過apply_async等與子流程共享隊列,您需要使用管理器(see example)

但是,您應該退後一步,問問自己您想做什麼。 apply_async真的是要走的路嗎?你有一個你想要重複映射的項目列表,應用一些計算密集型的長時間運行的轉換(因爲如果它們只是阻塞I/O,那麼你也可以使用線程)。在我看來,這其實imap_unordered是你想要什麼:

pool = multiprocessing.Pool(num_threads, init_worker) 
links = pool.imap_unordered(run_worker1, ids) 
output = pool.imap_unordered(fun_worker1, links) 

run_worker1fun_worker1需要進行修改,以取一個參數。如果您需要共享其他數據,那麼您應該在初始化程序中傳遞它,而不是一次又一次地將它傳遞給子進程。

+0

感謝您的評論。我也想啓動多個進程。 apply_async是正確的方法嗎?我會更多地瞭解impa_unordered – nEO

相關問題