2013-03-27 33 views
3

我有一些文件(超過4000),我想同時加載到PostgreSQL中。我已經將它們分成4個不同的文件列表,我想要一個線程遍歷每個加載數據的列表。subprocess.Popen在線程

我遇到的問題是我使用os.system來調用加載程序,但這可以防止其他線程同時運行。如果我使用subprocess.Popen,那麼它們會同時運行,但是線程相信它們已經完成了execeuting,因此移動到腳本的下一部分。

我這樣做是正確的嗎?還是有更好的方法來從一個線程內調用子進程。

def thread1Load(self, thread1fileList): 
    connectionstring = settings.connectionstring 
    postgreshost = settings.postgreshost 
    postgresdatabase = settings.postgresdatabase 
    postgresport = settings.postgresport 
    postgresusername = settings.postgresusername 
    postgrespassword = settings.postgrespassword 

    tablename = None 
    encoding = None 
    connection = psycopg2.connect(connectionstring) 

    for filename in thread1fileList: 
     load_cmd = #load command 
     run = subprocess.Popen(load_cmd, shell=True) 
    print "finished loading thread 1" 


def thread2Load(self, thread2fileList): 
    connectionstring = settings.connectionstring 
    postgreshost = settings.postgreshost 
    postgresdatabase = settings.postgresdatabase 
    postgresport = settings.postgresport 
    postgresusername = settings.postgresusername 
    postgrespassword = settings.postgrespassword 

    tablename = None 

    connection = psycopg2.connect(connectionstring) 
    for filename in thread2fileList: 
     load_cmd = #load command    
     run = subprocess.Popen(load_cmd, shell=True) 
    print "finished loading thread 2" 


def thread3Load(self, thread3fileList): 
    connectionstring = settings.connectionstring 
    postgreshost = settings.postgreshost 
    postgresdatabase = settings.postgresdatabase 
    postgresport = settings.postgresport 
    postgresusername = settings.postgresusername 
    postgrespassword = settings.postgrespassword 

    tablename = None 
    connection = psycopg2.connect(connectionstring) 

    for shapefilename in thread3fileList: 
     load_cmd = #load command 
     run = subprocess.Popen(load_cmd, shell=True) 
    print "finished loading thread 3" 

def thread4Load(self, thread4fileList): 
    connectionstring = settings.connectionstring 
    postgreshost = settings.postgreshost 
    postgresdatabase = settings.postgresdatabase 
    postgresport = settings.postgresport 
    postgresusername = settings.postgresusername 
    postgrespassword = settings.postgrespassword 

    tablename = None 

    connection = psycopg2.connect(connectionstring) 

    for filename in thread4fileList: 
     load_cmd = #load command 
     run = subprocess.Popen(load_cmd, shell=True) 

    print "finished loading thread 4" 


def finishUp(self): 
    print 'finishing up' 


def main(): 
load = Loader() 

thread1 = threading.Thread(target=(load.thread1Load), args=(thread1fileList,)) 
thread2 = threading.Thread(target=(load.thread2Load), args=(thread2fileList,)) 
thread3 = threading.Thread(target=(load.thread3Load), args=(thread3fileList,)) 
thread4 = threading.Thread(target=(load.thread4Load), args=(thread4fileList,)) 
threads = [thread1, thread2, thread3, thread4] 
for thread in threads: 
    thread.start() 
    thread.join() 


load.finishUp(connectionstring) 

if __name__ == '__main__': 
main() 

回答

7
  • Don't repeat yourself。一個threadLoad方法就夠了。這樣,如果您需要修改方法中的某些內容,則無需在4個不同的地方進行相同的修改。
  • 使用run.communicate()來阻止,直到子流程完成。
  • 這將啓動一個線程,然後阻塞,直到該線程完成,然後 啓動另一個線程,等:

    for thread in threads: 
        thread.start() 
        thread.join() 
    

    相反,先啓動所有線程,然後再加入所有的線程:

    for thread in threads: 
        thread.start() 
    for thread in threads: 
        thread.join() 
    

import subprocess 
import threading 


class Loader(object): 
    def threadLoad(self, threadfileList): 
     connectionstring = settings.connectionstring 
     ... 
     connection = psycopg2.connect(connectionstring) 

     for filename in threadfileList: 
      load_cmd = # load command 
      run = subprocess.Popen(load_cmd, shell=True) 
      # block until subprocess is done 
      run.communicate() 
     name = threading.current_thread().name 
     print "finished loading {n}".format(n=name) 

    def finishUp(self): 
     print 'finishing up' 


def main(): 
    load = Loader() 
    threads = [threading.Thread(target=load.threadLoad, args=(fileList,)) 
       for fileList in (thread1fileList, thread2fileList, 
           thread3fileList, thread4fileList)] 
    for thread in threads: 
     thread.start() 
    for thread in threads: 
     thread.join() 

    load.finishUp(connectionstring) 

if __name__ == '__main__': 
    main() 
+0

非常感謝,這是一個非常乾淨的好的方法 – tjmgis 2013-04-02 15:00:24