2013-08-21 148 views
3

我有一個包含大量數據的文件。每一行都是一條記錄。我正在嘗試對整個文件進行一些ETL工作。現在我正在使用標準輸入逐行讀取數據。這很酷的事情是你的腳本可以非常靈活地與其他腳本和shell命令集成。我將結果寫入標準輸出。例如。Python Threading stdin/stdout

$ cat input_file 
line1 
line2 
line3 
line4 
... 

我當前的Python代碼看起來是這樣的 - parse.py

import sys 
for line in sys.stdin: 
    result = ETL(line) # ETL is some self defined function which takes a while to execute. 
    print result 

下面的代碼是它是如何工作現在:

cat input_file | python parse.py > output_file 

我已經看過了線程模塊的Python,我想知道如果我使用該模塊,性能會大大提高。

問題1:我該如何規劃每個線程的配額,爲什麼?

... 
counter = 0 
buffer = [] 
for line in sys.stdin: 
    buffer.append(line) 
    if counter % 5 == 0: # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine 
     counter = 0 
     thread = parser(buffer) 
     buffer = [] 
     thread.start() 

問題2:多線程可能在同一時間打印出結果返回到標準輸出,如何組織他們,避免下面的情況呢?

import threading 
import time 

class parser(threading.Thread): 
    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      print elem + 'Finished' 

work = ['a', 'b', 'c', 'd', 'e', 'f'] 

thread1 = parser(['a', 'b']) 
thread2 = parser(['c', 'd']) 
thread3 = parser(['e', 'f']) 

thread1.start() 
thread2.start() 
thread3.start() 

的輸出是真難看,其中一個行都包含來自兩個線程的輸出。

aFinished 
cFinishedeFinished 

bFinished 
fFinished 
dFinished 
+0

你能鏈接「Python的線程模塊」嗎? 無論如何,線程訪問一個文件,恕我直言,是不是一件好事。您需要定義什麼內核可以通過鎖和信號量以及作品訪問什麼以及何時訪問。 由於大部分工作是I/O工作,而不是CPU工作,可能你不會看到性能提升。 –

回答

4

先問你第二個問題,這是mutexes的用途。您可以通過使用一個鎖的解析器之間進行協調得到你想要的乾淨的輸出,並確保只有一個線程在給定的時間段進入到輸出流:

class parser(threading.Thread): 
    output_lock = threading.Lock() 

    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      with self.output_lock: 
       print elem + 'Finished' 

至於你的第一個問題,請注意,多線程可能不會爲您的特定工作負載提供好處。這在很大程度上取決於您對每個輸入行(您的ETL函數)所做的工作主要是CPU限制還是IO限制。如果前者(我懷疑可能),線程將無濟於事,因爲global interpreter lock。在這種情況下,您可能希望使用multiprocessing模塊在多個進程而不是多個線程之間分配工作。

但您可以通過更容易實現的工作流程獲得相同的結果:將輸入文件拆分爲n個部分(使用例如split命令);分別在每個子文件上調用extract-and-transform腳本;然後連接結果輸出文件。

一個挑剔:「使用標準輸入逐行讀取數據,因爲它不會將整個文件加載到內存中」涉及到一個誤解。您可以從內部的Python,例如,通過在結構像一個文件對象替換sys.stdin逐行讀取一個文件行:

for line in sys.stdin: 

也見文件對象的readline()方法,並注意read()可以作爲參數要讀取的最大字節數。

+0

很多偉大的東西在你的文章,阿爾卑斯山。我對你的評論CPU限制/ IO限制非常感興趣。我想知道您是否有辦法確定CPU/IO佔用了多少時間和資源?順便說一句,他們的理由是我使用stdIO是因爲你可以將你的腳本與Shell命令集成在一起,這使得它非常靈活和方便。感謝關於'記憶中間理解'的更正。 –

0

線程是否會有幫助,您高度依賴於您的情況。特別是,如果您的功能涉及大量磁盤訪問,那麼線程可能會使您的速度得到顯着改善。

迴應你的第一個問題,我總是發現它只是依賴。確定理想的線程數時,有很多因素在起作用,其中許多因素都依賴於程序。例如,如果您正在進行大量磁盤訪問(這非常緩慢),那麼您將需要更多線程利用停機時間來等待磁盤訪問。但是,如果程序是CPU綁定的,那麼大量的線程可能不是非常有用。因此,儘管可能分析所有因素以獲得理想數量的線程,但通常要做出初步猜測並且從那裏進行調整要快得多。

更具體地說,雖然爲每個線程分配一定數量的線可能並不是分散工作的最佳途徑。例如,考慮一條線需要特別長的時間來處理。如果一條線程可以在這一條線上工作,而其他線程可以在此期間再多做幾條線,那將是最好的。處理這個問題的最好方法是使用Queue。如果將每行都插入到隊列中,則每個線程都可以從隊列中拉出一條線,處理它並重復,直到隊列爲空。通過這種方式,工作得到分配,從而不會有任何線程無需工作(當然,直到最後)。

現在,第二個問題。你絕對正確的做法是一次寫入多個線程的stdout並不是一個理想的解決方案。理想情況下,你會安排一些事情,以便寫入標準輸出只發生在一個地方。一個很好的方法是使用隊列。如果每個線程都將其輸出寫入共享隊列,則可以產生一個額外線程,其唯一任務是將項目從該隊列中取出並將其輸出到stdout。通過將打印限制爲只有一個線程,可以避免多個線程嘗試一次打印時固有的問題。