2015-12-03 69 views
6

我有一個python腳本,它具有以下功能: i。它需要一個輸入文件的數據(通常是嵌套的JSON格式) ii。將數據逐行傳遞到另一個函數,該函數將數據操作爲期望的格式 iii。最後它將輸出寫入文件。dask包不使用所有核心?備擇方案?

這是我目前的簡單的Python線,這是否......

def manipulate(line): 
    # a pure python function which transforms the data 
    # ... 
    return manipulated_json 

for line in f: 
    components.append(manipulate(ujson.loads(line))) 
    write_to_csv(components)` 

這工作,但與蟒蛇GIL將其限制在服務器上的一個核心,這是痛苦的緩慢,尤其是大量的數據。

我通常處理的數據量大約是4 gig gzip壓縮,但偶爾我必須處理數百個gzip壓縮的演出數據。它不一定是大數據,但仍然不能在內存中進行處理,而且Python的GIL處理非常緩慢。

在尋找優化數據處理的解決方案時,我遇到了dask。儘管當時PySpark似乎對我來說是明顯的解決方案,但dask的承諾和簡單性贏得了我的讚賞,我決定嘗試一下。

經過對dask的大量研究以及如何使用它,我放了一個非常小的腳本來複制我當前的過程。該腳本是這樣的:

import dask.bag as bag 
import json 
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')` 

這工作,併產生相同的結果我原來的非DASK腳本,但它仍然僅在服務器上使用一個CPU。所以,它根本沒有幫助。實際上,速度較慢。

我在做什麼錯?我錯過了什麼嗎?我對dask還是比較陌生的,所以讓我知道我是否忽略了某些事情,或者我應該做些什麼不同的事情。

另外,有沒有替代品dask使用服務器的全部容量(即所有的CPU),我需要做什麼?

感謝,

牛逼

+0

嗯從來沒有聽說過'dask',真的有趣,謝謝。你看過盒子標準的'multiprocessing'嗎?它很簡單(istic),但它的工作原理。 –

+0

您可能想問[Blaze郵件列表](https://groups.google.com/a/continuum.io/forum/#!forum/blaze-dev)。 Dask相對較新且不斷變化,從我所看到的情況來看,關於它的問題始終只有20個StackOverflow問題,因此可能沒有很多人在這裏看到您的問題並且知道足夠的幫助。 – BrenBarn

+0

FWIW,我訂閱了這個標籤,所以總有人在看它。對於這些問題,Stackoverflow是一個很好的地方。 – MRocklin

回答

2

這裏的問題是dask.dataframe.to_csv,這迫使你的單核心模式。

我推薦使用dask.bag來做你的閱讀和操作,然後轉儲到一堆CSV文件並行。轉儲到許多CSV文件比轉儲到單個CSV文件更容易協調。

import dask.bag as bag 
import json 
b = bag.from_filenames('input.json.gz').map(json.loads).map(manipulate).concat() 
b.map(lambda t: ','.join(map(str, t)).to_textfiles('out.*.csv').compute() 

也有可能是一個問題,試圖並行讀取一個GZIP文件,但上面的應該讓你開始。

+0

Thank @MRocklin!它沒有工作。大聲笑......但後來我將輸入文件分成多個塊,它工作。似乎它只使用與輸入文件數量一樣多的CPU。任何使這個功能動態化的計劃,所以你可以傳入一個輸入文件和包,將它分開並在底層並行處理它? – tamjd1

+0

dask.bag現在這樣做,只是不完美。一個可能的問題是GZIP對隨機訪問的支持不足。 – MRocklin

0

看起來袋子只和他們擁有的分區數量一樣平行。

對於我來說,運行

mybag=bag.from_filenames(filename, chunkbytes=1e7) 
mybag.npartitions 

產生

這解決了這個問題,並提出了處理完全並行。

相關問題