2012-11-27 94 views
2

目標:Python的多處理與unpicklable對象

  • 運行〜使用的SQLAlchemy與線程或進程一分貝40個巨大查詢,將相應SQLA ResultProxies在Queue.Queue(由multiprocessing.Manager正在處理)
  • 在同一時間,將結果寫入與多個過程的消耗所述隊列

當前狀態爲.csv文件:

  • 運行查詢並寫入數據的QueryThread和WriteThread類;由於查詢需要一段時間才能運行,因此GIL如何處理線程並沒有顯着的性能損失。另一方面,寫入文件需要花費很長時間;實際上,儘管最初的想法是運行WriteThread類的多個線程,但使用單個線程可以獲得最佳性能。

因此,使用多處理的想法;我希望能夠同時編寫輸出,而不是CPU綁定,而是I/O綁定。拋開背景,下面是問題(這本質上是一個設計問題) - multiprocessing library通過酸洗物體,然後將數據傳送到其他生成的過程來工作;但ResultProxy對象和共享隊列,我試圖在WriteWorker過程中使用不picklable,這將導致在下面留言(不逐字,但足夠接近):

pickle.PicklingError: Can't pickle object in WriteWorker.start() 

所以對於你的問題有幫助的人是,關於潛在設計模式或方法的任何想法可以避免這個問題?這看起來像一個簡單的,經典的生產者 - 消費者問題,我想解決方案很簡單,我只是過度想象它

任何幫助或反饋表示讚賞!謝謝:)

編輯:這裏的一些代碼相關的片段,讓我知道,如果有任何其他的方面,我可以提供

從父類

#init manager and queues 
self.manager = multiprocessing.Manager() 
self.query_queue = self.manager.Queue() 
self.write_queue = self.manager.Queue() 


def _get_data(self): 
    #spawn a pool of query processes, and pass them query queue instance 
    for i in xrange(self.NUM_QUERY_THREADS): 
     qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args) 
     qt.daemon = True 
     # qt.setDaemon(True) 
     qt.start() 

    #populate query queue 
    self.parse_sql_queries() 

    #spawn a pool of writer processes, and pass them output queue instance 
    for i in range(self.NUM_WRITE_THREADS): 
     wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict) 
     wt.daemon = True 
     # wt.setDaemon(True) 
     wt.start() 

    #wait on the queues until everything has been processed 
    self.query_queue.join() 
    self.write_queue.join() 

,並從QueryWorker類:

def run(self): 
    while True: 
     #grabs host from query queue 
     query_tupe = self.query_queue.get() 
     table = query_tupe[0] 
     query = query_tupe[1] 
     query_num = query_tupe[2] 
     if query and table: 
      #grab connection from pool, run the query 
      connection = self.engine.connect() 
      print 'Running query #' + str(query_num) + ': ' + table 
      try: 
       result = connection.execute(query) 
      except: 
       print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: ' + str(sys.exc_info()[1]) 

      #place result handle tuple into out queue 
      self.out_queue.put((table, result)) 

     #signals to queue job is done 
     self.query_queue.task_done() 

回答

1

簡單的答案是避免直接使用ResultsProxy。而是使用cursor.fetchall()或cursor.fetchmany(number_to_fetch)從ResultsProxy獲取數據,然後將數據傳遞到多處理隊列中。

+0

只要沒有太多結果,它就可以工作。您還可以設置一個送料器/消費者系統,在該系統中,使用遊標的進程將數據以塊的形式發送給子進程,而當子進程結束時,子進程會請求更多進程。 – Perkins

+0

感謝您的建議,我實施了這種方法,它的運作非常順暢。 我調用fetchmany(),用這個結果填充一個chunk_queue,然後消費者進程根據隊列寫入 – ignorantslut