2009-08-02 117 views
1

嘿,大家好,我有一點麻煩調試我的代碼。請看看下面:多處理調試錯誤

import globalFunc 
from globalFunc import systemPrint 
from globalFunc import out 
from globalFunc import debug 
import math 
import time 
import multiprocessing 

""" 
Somehow this is not working well 
""" 
class urlServerM(multiprocessing.Process): 

    """ 
    This calculates how much links get put into the priority queue 
    so to reach the level that we intend, for every query resultset, 
    we will put the a certain number of links into visitNext first, 
    and even if every resultSet is full, we will be able to achieve the link 
    level that we intended. The rest is pushed into another list where 
    if the first set of lists don't have max for every time, the remaining will 
    be spared on these links 
    """ 
    def getPriorityCounter(self, level, constraint): 
       return int(math.exp((math.log(constraint)/(level - 1)))) 


    def __init__(self, level, constraint, urlQ): 
     """limit is obtained via ngCrawler.getPriorityNum""" 
     multiprocessing.Process.__init__(self) 
     self.constraint = int(constraint) 
     self.limit = self.getPriorityCounter(level, self.constraint) 
     self.visitNext = [] 
     self.visitLater = [] 
     self._count = 0 
     self.urlQ = urlQ 


    """ 
    puts the next into the Queue 
    """ 
    def putNextIntoQ(self): 
     debug('putNextIntoQ', str(self.visitNext) + str(self.visitLater)) 
     if self.visitNext != []: 
      _tmp = self.visitNext[0] 
      self.visitNext.remove(_tmp) 
      self.urlQ.put(_tmp) 

     elif self.visitLater != []: 
      _tmp = self.visitLater[0] 
      self.visitLater.remove(_tmp) 
      self.urlQ.put(_tmp) 


    def run(self): 
     while True: 
      if self.hasNext(): 
       time.sleep(0.5) 
       self.putNextIntoQ() 
       debug('process', 'put something in Q already') 
      else: 
       out('process', 'Nothing in visitNext or visitLater, sleeping') 
       time.sleep(2) 
     return 


    def hasNext(self): 
     debug('hasnext', str(self.visitNext) + str(self.visitLater)) 
     if self.visitNext != []: 
      return True 
     elif self.visitLater != []: 
      return True 
     return False 


    """ 
    This function resets the counter 
    which is used to keep track of how much is already inside the 
    visitNext vs visitLater 
    """ 
    def reset(self): 
     self._count = 0 


    def store(self, linkS): 
     """Stores a link into one of these list""" 
     if self._count < self.limit: 
      self.visitNext.append(linkS) 
      debug('put', 'something is put inside visitNext') 
     else: 
      self.visitLater.append(linkS) 
      debug('put', 'something is put inside visitLater') 
     self._count += 1 



if __name__ == "__main__": 
    # def __init__(self, level, constraint, urlQ): 

    from multiprocessing import Queue 
    q = Queue(3) 
    us = urlServerM(3, 6000, q) 

    us.start() 
    time.sleep(2) 

    # only one thread will do this 
    us.store('http://www.google.com') 
    debug('put', 'put completed') 

    time.sleep(3) 

    print q.get_nowait() 
    time.sleep(3) 

這是輸出

OUTPUT 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
DEBUG put: something is put inside visitNext 
DEBUG put: put completed 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
Traceback (most recent call last): 
    File "urlServerM.py", line 112, in <module> 
    print q.get_nowait() 
    File "/usr/lib/python2.6/multiprocessing/queues.py", line 122, in get_nowait 
    return self.get(False) 
    File "/usr/lib/python2.6/multiprocessing/queues.py", line 104, in get 
    raise Empty 
Queue.Empty 
DEBUG hasnext: [][] 

顯然,我覺得這很奇怪。那麼基本上這個代碼是什麼,當在main()中測試時,它啓動這個過程,然後它將http://www.google.com存儲到類的visitNext中,然後我只想看到被推入隊列。

但是,根據輸出 我發現它非常奇怪,即使我的類已經完成將類存儲到類,hasNext不顯示任何東西。任何人都知道爲什麼?這是在連續while循環中編寫run()的最好方法嗎?這實際上是必要的嗎?我基本上試圖嘗試模塊多處理,並且我有一個工作者池(來自multiprocessing.Pool),它需要從這個類(單點入口)獲取這些URL。最好的方法是使用隊列嗎?我是否需要將這個過程作爲一個「實時」過程,因爲每個工作人員都要從隊列中請求,除非我有辦法向我的urlServer發信號通知隊列中的某些東西,否則我想不出一個麻煩的方法。

+0

看,沒有人可能會閱讀那麼多的代碼和解釋,甚至有機會回答。難道你不能把問題分解成10行代碼和5行解釋嗎?! – ThomasH 2009-08-02 17:16:24

回答

0

您正在使用多處理,因此內存不在主執行和您的urlserver之間共享。

I.e.我認爲這實際上是Noop:{us.store('http://www.google.com')},因爲它在主線程中執行時,它只會修改{us}的主線程表示。您可以通過在{q.get_nowait()}之前調用{us.hasnext()}來確認該URL在主線程的內存中。

爲了使其工作,您必須將所有要共享的列表轉換爲Queue-s或Pipe-s。或者,您可以將您的模型更改爲{threading},並且它應該可以在沒有變化的情況下工作(或多或少 - 您必須鎖定訪問列表,然後再次獲取GIL問題)。

(是的 - 請在下次更好地編輯您的問題,我知道什麼可能是您的問題,只要我看到「多處理」,但否則我不會打擾在代碼中看到......)