2011-09-20 28 views
2

我需要更新空間數據庫中的每個記錄,其中有一組覆蓋多邊形數據集的點的數據集。對於每個點要素,我想分配一個鍵以將其與它所在的面要素關聯起來。因此,如果我的觀點'紐約市'位於美國多邊形和美國多邊形'GID = 1'內,我將爲我的觀點紐約市分配'gid_fkey = 1'。使用Python進行並行數據庫更新(PostGIS/PostgreSQL)

爲此,我創建了以下查詢。

procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID) 

目前我正在從另一個查詢中獲取cityID信息,該查詢僅選擇gid_fkey爲NULL的所有cityID。本質上,我只需要遍歷這些並運行前面顯示的查詢。由於查詢只依賴另一個表中的靜態信息,因此所有這些過程都可以一次運行。我已經實現以下穿入過程,但我似乎無法使移植到多處理器

import psycopg2, pprint, threading, time, Queue 

queue = Queue.Queue() 
pyConn = psycopg2.connect("dbname='geobase_1' host='localhost'") 
pyConn.set_isolation_level(0) 
pyCursor1 = pyConn.cursor() 

getGID = 'SELECT cityID FROM city' 
pyCursor1.execute(getGID) 
gidList = pyCursor1.fetchall() 

class threadClass(threading.Thread): 

def __init__(self, queue): 
     threading.Thread.__init__(self) 
     self.queue = queue 

def run(self): 

     while True: 
      gid = self.queue.get() 

      procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE wp_id = %s), country.the_geom) AND city_id = %s' % (cityID, cityID) 

      pyCursor2 = pyConn.cursor()       
      pyCursor2.execute(procQuery) 

      print gid[0]      
      print 'Done' 

def main(): 

    for i in range(4): 
     t = threadClass(queue) 
     t.setDaemon(True) 
     t.start() 

     for gid in gidList: 
      queue.put(gid) 

    queue.join() 

main() 

我甚至不知道,如果多線程是最優的,但它絕對不是一個通過一個會更快。

我將使用的機器有四個內核(四核)和一個最小的Linux操作系統,沒有GUI,PostgreSQL,PostGIS和Python,如果這有所作爲。

我需要更改以啓用這個痛苦簡單的多處理任務嗎?

回答

4

好吧,這是我自己的帖子的答案。幹得好我= D

在我的系統從單核心線程轉向四核多處理時,速度提高了約150%。

import multiprocessing, time, psycopg2 

class Consumer(multiprocessing.Process): 

def __init__(self, task_queue, result_queue): 
    multiprocessing.Process.__init__(self) 
    self.task_queue = task_queue 
    self.result_queue = result_queue 

def run(self): 
    proc_name = self.name 
    while True: 
     next_task = self.task_queue.get() 
     if next_task is None: 
      print 'Tasks Complete' 
      self.task_queue.task_done() 
      break    
     answer = next_task() 
     self.task_queue.task_done() 
     self.result_queue.put(answer) 
    return 


class Task(object): 
def __init__(self, a): 
    self.a = a 

def __call__(self):   
    pyConn = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
    pyConn.set_isolation_level(0) 
    pyCursor1 = pyConn.cursor() 

     procQuery = 'UPDATE city SET gid_fkey = gid FROM country WHERE ST_within((SELECT the_geom FROM city WHERE city_id = %s), country.the_geom) AND city_id = %s' % (self.a, self.a) 

    pyCursor1.execute(procQuery) 
    print 'What is self?' 
    print self.a 

    return self.a 

def __str__(self): 
    return 'ARC' 
def run(self): 
    print 'IN' 

if __name__ == '__main__': 
tasks = multiprocessing.JoinableQueue() 
results = multiprocessing.Queue() 

num_consumers = multiprocessing.cpu_count() * 2 
consumers = [Consumer(tasks, results) for i in xrange(num_consumers)] 
for w in consumers: 
    w.start() 

pyConnX = psycopg2.connect("dbname='geobase_1' host = 'localhost'") 
pyConnX.set_isolation_level(0) 
pyCursorX = pyConnX.cursor() 

pyCursorX.execute('SELECT count(*) FROM cities WHERE gid_fkey IS NULL')  
temp = pyCursorX.fetchall()  
num_job = temp[0] 
num_jobs = num_job[0] 

pyCursorX.execute('SELECT city_id FROM city WHERE gid_fkey IS NULL')  
cityIdListTuple = pyCursorX.fetchall()  

cityIdList = [] 

for x in cityIdListTuple: 
    cityIdList.append(x[0]) 


for i in xrange(num_jobs): 
    tasks.put(Task(cityIdList[i - 1])) 

for i in xrange(num_consumers): 
    tasks.put(None) 

while num_jobs: 
    result = results.get() 
    print result 
    num_jobs -= 1 

現在我有我已經張貼在這裏的另一個問題:

Create DB connection and maintain on multiple processes (multiprocessing)

希望我們能擺脫一些開銷和速度這個嬰兒起來,甚至更多。

+0

嗨@ene,如果這解決了你的問題,這是一個很好的做法,將其標記爲回答:) – eberbis

+0

是的,這很奇怪,因爲我只是一個客人用戶或一些這樣的時候,我發佈這個問題,我沒有把我自己的問題標記爲正確的能力。你可以看到縮略圖圖像沒有與我一起更新。建議如何解決,歡迎 –

+0

哦,是的..問題是,你發佈你的問題在一個不同的(未註冊)用戶(http://stackoverflow.com/users/954992/ene),你現在正在使用註冊的(http://stackoverflow.com/users/965035/ene)回覆。正如你可以看到它們上的ID不同。這可能有所幫助:http://meta.stackexchange.com/questions/74024/registration-with-my-unregistered-account – eberbis

0

在普通的SQL一個可以這樣做:

UPDATE city ci 
SET gid_fkey = co.gid 
FROM country co 
WHERE ST_within(ci.the_geom , co.the_geom) 
AND ci.city_id = _some_parameter_ 
     ; 

有可能是一個問題,如果一個城市將融入一個以上的國家(導致多個更新到同一目標行),但是這可能是而不是數據中的情況。

相關問題