2
- 運行〜使用的SQLAlchemy與線程或進程一分貝40個巨大查詢,將相應SQLA ResultProxies在Queue.Queue(由multiprocessing.Manager正在處理)
- 在同一時間,將結果寫入與多個過程的消耗所述隊列
當前狀態爲.csv文件:
- 運行查詢並寫入數據的QueryThread和WriteThread類;由於查詢需要一段時間才能運行,因此GIL如何處理線程並沒有顯着的性能損失。另一方面,寫入文件需要花費很長時間;實際上,儘管最初的想法是運行WriteThread類的多個線程,但使用單個線程可以獲得最佳性能。
因此,使用多處理的想法;我希望能夠同時編寫輸出,而不是CPU綁定,而是I/O綁定。拋開背景,下面是問題(這本質上是一個設計問題) - multiprocessing library通過酸洗物體,然後將數據傳送到其他生成的過程來工作;但ResultProxy對象和共享隊列,我試圖在WriteWorker過程中使用不picklable,這將導致在下面留言(不逐字,但足夠接近):
pickle.PicklingError: Can't pickle object in WriteWorker.start()
所以對於你的問題有幫助的人是,關於潛在設計模式或方法的任何想法可以避免這個問題?這看起來像一個簡單的,經典的生產者 - 消費者問題,我想解決方案很簡單,我只是過度想象它
任何幫助或反饋表示讚賞!謝謝:)
編輯:這裏的一些代碼相關的片段,讓我知道,如果有任何其他的方面,我可以提供
從父類:
#init manager and queues
self.manager = multiprocessing.Manager()
self.query_queue = self.manager.Queue()
self.write_queue = self.manager.Queue()
def _get_data(self):
#spawn a pool of query processes, and pass them query queue instance
for i in xrange(self.NUM_QUERY_THREADS):
qt = QueryWorker.QueryWorker(self.query_queue, self.write_queue, self.config_values, self.args)
qt.daemon = True
# qt.setDaemon(True)
qt.start()
#populate query queue
self.parse_sql_queries()
#spawn a pool of writer processes, and pass them output queue instance
for i in range(self.NUM_WRITE_THREADS):
wt = WriteWorker.WriteWorker(self.write_queue, self.output_path, self.WRITE_BUFFER, self.output_dict)
wt.daemon = True
# wt.setDaemon(True)
wt.start()
#wait on the queues until everything has been processed
self.query_queue.join()
self.write_queue.join()
,並從QueryWorker類:
def run(self):
while True:
#grabs host from query queue
query_tupe = self.query_queue.get()
table = query_tupe[0]
query = query_tupe[1]
query_num = query_tupe[2]
if query and table:
#grab connection from pool, run the query
connection = self.engine.connect()
print 'Running query #' + str(query_num) + ': ' + table
try:
result = connection.execute(query)
except:
print 'Error while running query #' + str(query_num) + ': \n\t' + str(query) + '\nError: ' + str(sys.exc_info()[1])
#place result handle tuple into out queue
self.out_queue.put((table, result))
#signals to queue job is done
self.query_queue.task_done()
只要沒有太多結果,它就可以工作。您還可以設置一個送料器/消費者系統,在該系統中,使用遊標的進程將數據以塊的形式發送給子進程,而當子進程結束時,子進程會請求更多進程。 – Perkins
感謝您的建議,我實施了這種方法,它的運作非常順暢。 我調用fetchmany(),用這個結果填充一個chunk_queue,然後消費者進程根據隊列寫入 – ignorantslut