我有一個python腳本,它具有以下功能: i。它需要一個輸入文件的數據(通常是嵌套的JSON格式) ii。將數據逐行傳遞到另一個函數,該函數將數據操作爲期望的格式 iii。最後它將輸出寫入文件。dask包不使用所有核心?備擇方案?
這是我目前的簡單的Python線,這是否......
def manipulate(line):
# a pure python function which transforms the data
# ...
return manipulated_json
for line in f:
components.append(manipulate(ujson.loads(line)))
write_to_csv(components)`
這工作,但與蟒蛇GIL將其限制在服務器上的一個核心,這是痛苦的緩慢,尤其是大量的數據。
我通常處理的數據量大約是4 gig gzip壓縮,但偶爾我必須處理數百個gzip壓縮的演出數據。它不一定是大數據,但仍然不能在內存中進行處理,而且Python的GIL處理非常緩慢。
在尋找優化數據處理的解決方案時,我遇到了dask。儘管當時PySpark似乎對我來說是明顯的解決方案,但dask的承諾和簡單性贏得了我的讚賞,我決定嘗試一下。
經過對dask的大量研究以及如何使用它,我放了一個非常小的腳本來複制我當前的過程。該腳本是這樣的:
import dask.bag as bag
import json
bag.from_filenames('input.json.gz').map(json.loads).map(lambda x:manipulate(x)).concat().to_dataframe().to_csv('output.csv.gz')`
這工作,併產生相同的結果我原來的非DASK腳本,但它仍然僅在服務器上使用一個CPU。所以,它根本沒有幫助。實際上,速度較慢。
我在做什麼錯?我錯過了什麼嗎?我對dask還是比較陌生的,所以讓我知道我是否忽略了某些事情,或者我應該做些什麼不同的事情。
另外,有沒有替代品dask使用服務器的全部容量(即所有的CPU),我需要做什麼?
感謝,
牛逼
嗯從來沒有聽說過'dask',真的有趣,謝謝。你看過盒子標準的'multiprocessing'嗎?它很簡單(istic),但它的工作原理。 –
您可能想問[Blaze郵件列表](https://groups.google.com/a/continuum.io/forum/#!forum/blaze-dev)。 Dask相對較新且不斷變化,從我所看到的情況來看,關於它的問題始終只有20個StackOverflow問題,因此可能沒有很多人在這裏看到您的問題並且知道足夠的幫助。 – BrenBarn
FWIW,我訂閱了這個標籤,所以總有人在看它。對於這些問題,Stackoverflow是一個很好的地方。 – MRocklin