2017-01-15 19 views
2

我一直在一大堆文件上進行大量的文本處理,包括大型的CSV文件和大量的小型XML文件。有時候我正在做聚合計數,但很多時候我正在做NLP類型的工作,以便更深入地瞭解這些文件中除標籤或已結構化之外的內容。CSV日期分析緩慢的Dask性能?

我一直在使用多處理函數庫來執行跨多個CPU的這些計算,但是我已經愛上了Dask背後的想法,並且強烈建議在網絡和同事之間。

我問了一下這裏DASK性能類似的問題:

Slow Performance with Python Dask bag?

和MRocklin(https://stackoverflow.com/users/616616/mrocklin)讓我知道裝載大量小文件可能丟棄的性能。

但是,當我在單個大文件(200mb)上運行它時,我仍然沒有很好地執行它。這裏有一個例子:

我有一個90萬行的tweets CSV文件,我想快速加載並解析「created_at」字段。以下是我做過的三種方法和每種方法的基準。我在一臺新的i7 2016 MacBook Pro上安裝了16GB的RAM。

import pandas 
import dask.dataframe as dd 
import multiprocessing 

%%time 
# Single Threaded, no chunking 
d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", parse_dates = ["created_at"]) 
print(len(d)) 

CPU時間:用戶2分31秒,SYS:807毫秒,總:2分32秒 牆時間:2分鐘32秒

%%time 
# Multithreaded chunking 
def parse_frame_dates(frame): 
    frame["created_at"] = pandas.to_datetime(frame["created_at"]) 
    return(frame) 

d = pandas.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", chunksize = 100000) 
frames = multiprocessing.Pool().imap_unordered(get_count, d) 
td = pandas.concat(frames) 
print(len(td)) 

CPU時間:用戶5.65 S,SYS:1.47 s,計: 7.12小號 牆時間:1分鐘10秒

%%time 
# Dask Load 
d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000).compute() 

CPU時間:用戶2分鐘59秒,SYS:26.2 s,計:3分鐘25秒 牆時間:3min 12S

我在許多不同的Dask比較中發現了這些類型的結果,但即使讓這個工作正確也可能指向正確的方向。

總之,如何從Dask中獲得最佳性能來完成這些任務?爲什麼它在單線程和多線程技術方面表現不如其他方式?

回答

2

我懷疑Pandas的read_csv datetime解析代碼是純python,所以不會從使用線程中受益匪淺,這是dask.dataframe默認使用的。

使用進程時可能會看到更好的性能。

我懷疑下面會工作得更快:

import dask.multiprocessing 
dask.set_options(get=dask.multiprocessing.get) # set processes as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 

問題與進程是進程間通信可能變得昂貴。我明確地計算了上面的len(d)而不是d.compute(),以避免必須在工作進程中使用所有的熊貓數據幀,並將它們移動到主調用進程。實際上,這是非常常見的,因爲人們很少需要完整的數據幀,但是需要對數據幀進行一些計算。

相關docpage這裏是http://dask.readthedocs.io/en/latest/scheduler-choice.html

您可能還需要一臺機器上使用distributed scheduler而不是使用多調度。這在上面引用的文檔中也有描述。

$ pip install dask distributed 

from dask.distributed import Client 
c = Client() # create processes and set as default 

d = dd.read_csv("/Users/michaelshea/Documents/Data/tweet_text.csv", 
       parse_dates = ["created_at"], blocksize = 10000000) 
len(d) 
+1

定時的方法,OP是不一樣的。傳遞''parse_dates = ...''是一個相當健壯的方法,但我必須回退到較慢的解析(在python中)。你幾乎總是想簡單地在csv,THEN,後處理中讀取.to_datetime,特別是你可能需要使用format =參數或其他選項,具體取決於日期。 ,YMMV。事實上,這種方法特別便於使用,因爲它們是獨立的,但是是順序任務。 – Jeff

+1

謝謝,馬修!這產生了巨大的差異。在我的例子中,進入多進程,從單線程的2分鐘到達斯克的30分鐘。更符合我的預期。我將深入瞭解調度程序選擇和分佈式調度程序的使用信息。我顯然沒有做完所有的功課。再次感謝! –