2011-09-26 62 views
18

與我發表的另一篇文章類似,這回答了該帖子並創建了一個新問題。創建數據庫連接並維護多個進程(多處理)

回顧:我需要更新空間數據庫中的每個記錄,其中有一組覆蓋多邊形數據集的點的數據集。對於每個點要素,我想分配一個鍵以將其與它所在的面要素關聯起來。因此,如果我的觀點'紐約市'位於美國多邊形和美國多邊形'GID = 1'內,我將爲我的觀點紐約市分配'gid_fkey = 1'。

好吧,這已經實現了使用多處理。我注意到使用這個速度增加了150%,所以它工作。但我認爲有一些不必要的開銷,因爲每個記錄需要一個數據庫連接。

所以這裏是代碼:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 

    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task() 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self):   
     pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     pyConn.set_isolation_level(0) 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 

if __name__ == '__main__': 
    tasks = multiprocessing.JoinableQueue() 
    results = multiprocessing.Queue() 

    num_consumers = multiprocessing.cpu_count() * 2 
    consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
    for w in consumers: 
     w.start() 

    pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConnX.set_isolation_level(0) 
    pyCursorX = pyConnX.cursor() 

    pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
    temp = pyCursorX.fetchall()  
    num_job = temp[0] 
    num_jobs = num_job[0] 

    pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
    cityIdListTuple = pyCursorX.fetchall()  

    cityIdListList = [] 

    for x in cityIdListTuple: 
     cityIdList.append(x[0]) 


    for i in xrange(num_jobs): 
     tasks.put(Task(cityIdList[i - 1])) 

    for i in xrange(num_consumers): 
     tasks.put(None) 

    while num_jobs: 
     result = results.get() 
     print result 
     num_jobs -= 1 

它看起來是每個連接0.3和1.5秒之間,因爲我有「時間」模塊測量。

有沒有辦法讓每個進程都建立一個數據庫連接,然後只用city_id info作爲一個變量,我可以將這個變量提供給這個open中的遊標查詢?這樣我說四個進程與每個數據庫連接,然後放下我city_id以某種方式處理。

回答

31

嘗試隔離在消費者構造你的連接的創建,然後把它交給執行任務:

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

    def __init__(self, task_queue, result_queue): 
     multiprocessing.Process.__init__(self) 
     self.task_queue = task_queue 
     self.result_queue = result_queue 
     self.pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
     self.pyConn.set_isolation_level(0) 


    def run(self): 
     proc_name = self.name 
     while True: 
      next_task = self.task_queue.get() 
      if next_task is None: 
       print 'Tasks Complete' 
       self.task_queue.task_done() 
       break    
      answer = next_task(connection=self.pyConn) 
      self.task_queue.task_done() 
      self.result_queue.put(answer) 
     return 


class Task(object): 
    def __init__(self, a): 
     self.a = a 

    def __call__(self, connection=None):   
     pyConn = connection 
     pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

     pyCursor1.execute(procQuery) 
     print 'What is self?' 
     print self.a 

     return self.a 

    def __str__(self): 
     return 'ARC' 
    def run(self): 
     print 'IN' 
+1

伴侶得到成功的治療。沒有榮譽給你批准的機會,但代碼是絕對的魔法。擺脫固定的數據庫連接可以輕鬆地將速度提高50%。在某些情況下可能接近100%。再次感謝。 –

+0

@EnE_:我很高興它幫助你:)。你應該接受答案,你有權這樣做,因爲你是問題的主人。 –

+0

好吧,我不得不承認,我認爲我應該按下向上的箭頭而不是剔。 '批准的滴答聲'是一個不幸的自我譴責的轉向= D –