2010-01-15 71 views
4

我需要讀取一些非常大的文本文件(100+ Mb),用正則表達式處理每一行並將數據存儲到結構中。我的結構繼承自defaultdict,它有一個read(self)方法來讀取self.file_name文件。使用多重處理讀取多個文件

看看這個非常簡單的(但不是真正的)例子,我不使用正則表達式,但我分割線:


import multiprocessing 
from collections import defaultdict 

def SingleContainer(): 
    return list() 

class Container(defaultdict): 
    """ 
    this class store odd line in self["odd"] and even line in self["even"]. 
    It is stupid, but it's only an example. In the real case the class 
    has additional methods that do computation on readen data. 
    """ 
    def __init__(self,file_name): 
     if type(file_name) != str: 
      raise AttributeError, "%s is not a string" % file_name 
     defaultdict.__init__(self,SingleContainer) 
     self.file_name = file_name 
     self.readen_lines = 0 
    def read(self): 
     f = open(self.file_name) 
     print "start reading file %s" % self.file_name 
     for line in f: 
      self.readen_lines += 1 
      values = line.split() 
      key = {0: "even", 1: "odd"}[self.readen_lines %2] 
      self[key].append(values) 
     print "readen %d lines from file %s" % (self.readen_lines, self.file_name) 

def do(file_name): 
    container = Container(file_name) 
    container.read() 
    return container.items() 

if __name__ == "__main__": 
    file_names = ["r1_200909.log", "r1_200910.log"] 
    pool = multiprocessing.Pool(len(file_names)) 
    result = pool.map(do,file_names) 
    pool.close() 
    pool.join() 
    print "Finish"  

最後我需要加入在一個容器中的每個結果。保留行的順序非常重要。返回值時,我的方法太慢了。解決方案更好 我在Linux上使用python 2.6

回答

0

多處理更適合面向CPU或面向內存的進程,因爲旋轉驅動器的查找時間在文件之間切換時會導致性能下降。將您的日誌文件加載到快速閃存驅動器或某種內存磁盤(物理或虛擬),或放棄多處理。

+0

我的問題是cpu-bound而不是IO-bound。在這個例子中,我正在分割線,但在實際情況下,我正在處理一個複雜和長的正則表達式以及IO時間(seek,...)是很小,然後CPU時間 –

0

您正在創建一個包含儘可能多的工人作爲文件的池。這可能太多了。通常,我的目標是讓工作人員的數量與核心數量大致相同。

簡單的事實是,您的最後一步將是將所有結果合併在一起的單個過程。鑑於您的問題描述,沒有避免這一點。這被稱爲障礙同步:所有任務必須在任何可以繼續之前達到相同的點。

您應該多次運行此程序,或者在循環中,每次將不同的值傳遞給multiprocessing.Pool(),從1開始並轉至核心數。每次運行時間,並查看哪個工人數最好。

結果將取決於您的任務的CPU密集型(而不是磁盤密集型)。如果你的任務大約是一半CPU和一半磁盤,即使在一臺8核機器上,如果2最好,我也不會感到驚訝。

+0

是的,我已經做到了。我的選擇不是隨便的選擇。我試圖在沒有返回線的情況下測量時間,最好的選擇是當進程數等於文件數時,即使進程數大於核心數也是如此。 –

+0

然後我不明白你怎麼能做得更好。殺手是:「重要的是保留線條的順序。」即使您單獨預處理每個文件,也只能一次完成一個輸入。另一種方法是讓每個工作人員生成一個帶有後綴的文件,然後按順序讀取這些文件,從而消除合併。 –

4

你可能碰到兩個問題。

其中之一被提到:你一次讀取多個文件。這些讀取將最終交錯,導致磁盤抖動。您想要一次讀取整個文件,然後只對數據進行多線程計算。

其次,你正在打擊Python的多處理模塊的開銷。它實際上並不使用線程,而是啓動多個進程並通過管道序列化結果。對於批量數據來說這非常慢 - 事實上,它似乎比你在線程中做的工作要慢(至少在這個例子中)。這是由GIL引起的真實世界問題。

如果我修改不()返回無不是container.items()禁用額外的數據複製,這個例子比單線程快,只要這些文件已經被緩存:

兩個線程:0.36elapsed 168%的CPU

一個線程(有圖替換pool.map)0:00.52elapsed 98%的CPU

不幸的是,GIL問題是根本,不能從各地工作在Python內部。

+0

是的,這是問題:返回數據。由於GIL,我使用多處理而不是多線程。但我想用我的cpu的所有核心優化我的程序!如果我從「開始讀取文件」和「Readeng%d行」(忽略返回時間)中確定時間,多處理版本比單進程版本(我有2個內核)快2倍。現在:共享內存呢?我查看了multiprocess.Manager類,但我想共享比dict更復雜的結構。 –

+0

我沒有使用管理器,但它看起來像代理數據的操縱,所以我懷疑它更慢。您可以使用共享內存來共享簡單的內存塊,但不能使用本地Python類型。你可能想要尋找其他的優化,但沒有任何實際的代碼,我不能提出任何建議。 –

+0

也許一個解決方案可以是:在C++中重寫讀取函數並使用C++實現真正的多線程?有了這種方法,我可以繞過問題在進程(管道)之間共享數據嗎? –