2014-09-04 63 views
0

我有一個大文件作爲我的Python代碼的輸入,它會產生相應的輸出文件。但是,這需要太多時間,我想加快速度。如何平行我的Python代碼

現在,我把大文件分成1000個小文件。我想要一個能夠啓動1000個線程的小腳本,每個線程使用我的原始python代碼並擁有自己的輸出文件。

任何人都可以給我一個示例/示例代碼?

+0

它不會加速它(很多,如果有的話)...你應該只將它拆分成許多部分,因爲有可用的內核......並使用多處理庫......在python中使用線程的唯一原因是當你有一個圖形用戶界面時,你不想阻塞...否則你應該使用多處理,如果你需要並行數據處理 – 2014-09-04 17:40:44

+0

你的工作實際上是由CPU(處理)還是由I/O(讀寫文件)支配?在決定如何並行化之前,您需要通過配置文件來確定_first_。 – abarnert 2014-09-04 17:41:40

+0

它是通過I/O,每條線耗費4ms CPU,我假設I/O應該更高。 – Jin 2014-09-04 17:44:01

回答

1
  • 如果沒有1000級的處理器呢,劈1000有沒有興趣......在相反,大的開銷...
  • 多線程是管理I/O阻塞更加有效,不併行處理工作。
  • 如果你的問題是我在同一個設備/ O,使更多的會增加其負荷,增加開銷(頭移動,緩存垃圾...)

什麼您在搜索更加多: https://docs.python.org/2/library/multiprocessing.html

+0

我明白了。非常感謝 – Jin 2014-09-04 17:46:05

1

如果您決定使用multiprocessing,那麼您將以非常類似的方式完成此操作。 你可以嘗試這樣的事情:

import Queue 
from threading import Thread 

file_list = ['filea', 'fileb'] 

def do_stuff(q): 
    while True: 
     try: 
      file_name = q.get(False) 
     except Queue.Empty: 
      # Handle empty queue here 
      break 
     # do what ever you need here 
     print file_name 
     q.task_done() 

q = Queue.Queue(maxsize=0) 
num_threads = 2 

for x in file_list: 
    q.put(x) 

for i in range(num_threads): 
    worker = Thread(target=do_stuff, args=(q,)) 
    worker.setDaemon(True) 
    worker.start() 

q.join() 
+0

爲什麼在'multiprocessing'庫有一個內置的時候自己建立一個池(它還增加了你沒有構建的所有類型的特性,比如返回值,正確的信號完成和等待等),' concurrent.futures'(或'futures' backport)有一個更容易使用的執行器? – abarnert 2014-09-04 17:48:03

+0

@abarnert同意,但這僅僅是一個例子,顯示一個想法。 – Vor 2014-09-04 17:48:39

+0

好的,但是爲什麼要在幾行代碼中以艱難的方式構建一個例子,讓事情脫節,什麼時候可以用簡單的方式在幾行代碼中編寫例子並覆蓋所有內容? – abarnert 2014-09-04 17:49:19

5

首先,使用1000線幾乎肯定會慢下來,不加快速度。即使您的代碼完全受I/O限制,1000仍在推動許多平臺調度程序的限制,並且您將花費更多時間進行上下文切換,而不是進行實際工作。接下來,您需要知道您的代碼是否受CPU限制(即對內存中的信息進行實際處理)或I/O限制(即等待磁盤讀取和寫入等操作)。


如果你的代碼是CPU綁定的,你可以保持CPU的繁忙相當一致的,想要每個核心正是1個線程。這樣,通過最少量的上下文切換(和緩存抖動,假設大部分工作在不可變或非共享值上完成),您可以獲得最大的並行度。另外(除非那些工作是在專門設計的C擴展中完成的,比如numpy),你希望這些線程在不同的進程中,因爲每個進程每次只有一個線程可以一次運行Python解釋器,這要歸功於全球口譯員鎖定。

所以,你想要的東西幾乎肯定是一個進程池。最簡單的方法是使用concurrent.futures.ProcessPoolExecutor,可能帶有max_workers參數(也許從16開始,然後嘗試上下調整以查看是否有幫助)。


如果,另一方面,你的代碼主要是I/O限制,那麼幾十個線程是合理的,特別是如果延遲是不可預測的,但在同一進程沒有1000和線程會工作正常,因爲一個線程可以運行Python解釋器,而其他線程都在等待操作系統完成磁盤操作。

所以,在這種情況下,你想要一個concurrent.futures.ThreadPoolExecutor


如果你不知道,不知道怎麼找出來,用線程池構建它,然後再使用ActivityMonitor或任何Windows現在調用它的進程管理器或您的300個選擇喜愛在Linux上觀看它運行;如果最終得到100%的核心和其他25%以下的核心,那麼你太過於CPU而不能使用線程。幸運的是,切換到進程池是一個微不足道的變化 - 用ProcessPoolExecutor代替ThreadPoolExecutor,並刪除max_workers參數,以便Python選擇最佳的默認值,現在就完成了。


無論哪種情況,文檔中的示例都足夠好,因此沒有理由要求其他示例代碼。