2017-09-27 86 views
0

我試圖使用daskfbprophet庫,我要麼做錯了什麼或有意想不到的性能問題。Dask和fbprophet

import dask.dataframe as dd 
import datetime as dt 
import multiprocessing as mp 
import numpy as np 
import pandas as pd 
pd.options.mode.chained_assignment = None 
from fbprophet import Prophet 
import time 
ncpu = mp.cpu_count() 

def parallel_pd(fun, vec, pool = ncpu-1): 
    with mp.Pool(pool) as p: 
     res = p.map(fun,vec) 
    return(res) 

def forecast1dd(ts): 
    time.sleep(0.1) 
    return ts["y"].max() 

def forecast1mp(key): 
    ts = df[df["key"]==key] 
    time.sleep(0.1) 
    return ts["y"].max() 

def forecast2dd(ts): 
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1), 
                periods=7, freq="D")}) 
    key = ts.name 
    model = Prophet(yearly_seasonality=True) 
    model.fit(ts) 
    forecast = model.predict(future) 
    future["yhat"] = forecast["yhat"] 
    future["key"] = key 
    return future.as_matrix() 

def forecast2mp(key): 
    ts = df[df["key"]==key] 
    future = pd.DataFrame({"ds":pd.date_range(start=ts["ds"].max()+ dt.timedelta(days=1), 
                periods=7, freq="D")}) 
    model = Prophet(yearly_seasonality=True) 
    model.fit(ts) 
    forecast = model.predict(future) 
    future["yhat"] = forecast["yhat"] 
    future["key"] = key 
    return future.as_matrix() 

在一個側我有運行在約0.1秒,從而forecast1ddforecast1mp在模擬我的功能和用於下面的數據幀的自定義函數

N = 2*365 
key_n = 5000 
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"), 
        "y":np.random.normal(100,20,N), 
        "key":np.repeat(str(k),N)}) for k in range(key_n)]) 
keys = df.key.unique() 
df = df.sample(frac=1).reset_index(drop=True) 
ddf = dd.from_pandas(df, npartitions=ncpu*2) 

我得到(分別)

%%time 
grp = ddf.groupby("key").apply(forecast1dd, meta=pd.Series(name="s")) 
df1dd = grp.to_frame().compute() 
CPU times: user 7.7 s, sys: 400 ms, total: 8.1 s 
Wall time: 1min 8s 

%%time 
res = parallel_pd(forecast1mp,keys) 
CPU times: user 820 ms, sys: 360 ms, total: 1.18 s 
Wall time: 10min 36s 

在第一種情況下,核心沒有在100%使用,但性能符合我的實際情況。使用線剖析器很容易檢查到,第二種情況下性能低下的罪魁禍首是ts = df[df["key"]==key],如果我們擁有更多密鑰,情況會變得更糟。

所以到現在爲止我很滿意dask。但每當我嘗試使用fbprophet事情都會改變。在這裏,我使用較少的keys,但不太可能以前的案例dask的表現總是比multiprocessing差。

N = 2*365 
key_n = 200 
df = pd.concat([pd.DataFrame({"ds":pd.date_range(start="2015-01-01",periods=N, freq="D"), 
        "y":np.random.normal(100,20,N), 
        "key":np.repeat(str(k),N)}) for k in range(key_n)]) 

keys = df.key.unique() 
df = df.sample(frac=1).reset_index(drop=True) 
ddf = dd.from_pandas(df, npartitions=ncpu*2) 

%%time 
grp = ddf.groupby("key").apply(forecast2dd, 
meta=pd.Series(name="s")).to_frame().compute() 
df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values]) 
CPU times: user 3min 42s, sys: 15 s, total: 3min 57s 
Wall time: 3min 30s 

%%time 
res = parallel_pd(forecast2mp,keys) 
df2mp = pd.concat([pd.DataFrame(a) for a in res]) 
CPU times: user 76 ms, sys: 160 ms, total: 236 ms 
Wall time: 39.4 s 

現在我的問題是:

  • 我怎樣才能改善與DASK先知的表現?
  • 我應該怎麼做才能使用100%的內核?

回答

1

我懷疑先知是持有GIL,所以當計算ddf.groupby("key").apply(forecast2dd, meta=pd.Series(name="s")時,只有一個線程可以一次運行Python代碼。使用multiprocessing可以迴避這一點,代價是不得不復制您的數據ncpu次。這應該與您的parallel_pd函數具有相似的運行時間。

%%time 
with dask.set_options(get=dask.multiprocessing.get): 
    grp = ddf.groupby("key").apply(forecast2dd, 
     meta=pd.Series(name="s")).to_frame().compute() 

df2dd = pd.concat([pd.DataFrame(a) for a in grp.s.values]) 

CPU times: user 2.47 s, sys: 251 ms, total: 2.72 s 
Wall time: 1min 27s 

你可以試試問先知的開發者是否需要持有GIL。我懷疑這個問題出現在PyStan中,並且他們在實際的Stan解算器運行時可能不需要GIL。有一個Github的問題here


旁註:

%%time 

def forcast1dd_chunk(ts): 
    time.sleep(0.1) 
    return ts.max() 

def forecast1dd_agg(ts): 
    return ts.max() 

f1dd = dd.Aggregation("forecast1dd", forcast1dd_chunk, forecast1dd_agg) 

grp = ddf.groupby("key")[['y']].agg(f1dd) 
x = grp.compute() 

CPU times: user 59.5 ms, sys: 5.13 ms, total: 64.7 ms 
Wall time: 355 ms 

儘管這並不符合實際:因爲你的樣品forecast1dd是一個聚合,它可以更快地使用dd.Aggregation運行問題,這不是一個聚合。

+0

嗨湯姆,我試過你的方法,以及'從dask.distributed導入客戶端',然後'client = Client()'和性能幾乎相同。問題是,每當我在'forecast2dd'中使用'key_n = 5000'時,出現以下錯誤:'OSError:[Errno 24]太多打開的文件:'/ dev/null'' – user32185

+0

我解決了'OSError:[Errno 24 ]太多打開的文件'從終端觸發'ulimit -Sn 10000'。 – user32185