2017-02-21 47 views
1

我DASK據幀具有約120mm行4列:錯誤而導出DASK數據幀到csv

df_final.dtypes 

cust_id  int64 
score   float64 
total_qty  float64 
update_score float64 
dtype: object 

和我做對連接到Linux機器jupyter筆記本這個操作:

%time df_final.to_csv('/path/claritin-files-*.csv') 

,並拋出了這個錯誤:

--------------------------------------------------------------------------- 
ValueError        Traceback (most recent call last) 
<ipython-input-24-46468ae45023> in <module>() 
----> 1 get_ipython().magic(u"time df_final.to_csv('path/claritin-files-*.csv')") 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in magic(self, arg_s) 
    2334   magic_name, _, magic_arg_s = arg_s.partition(' ') 
    2335   magic_name = magic_name.lstrip(prefilter.ESC_MAGIC) 
-> 2336   return self.run_line_magic(magic_name, magic_arg_s) 
    2337 
    2338  #------------------------------------------------------------------------- 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/interactiveshell.pyc in run_line_magic(self, magic_name, line) 
    2255     kwargs['local_ns'] = sys._getframe(stack_depth).f_locals 
    2256    with self.builtin_trap: 
-> 2257     result = fn(*args,**kwargs) 
    2258    return result 
    2259 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magic.pyc in <lambda>(f, *a, **k) 
    191  **# but it's overkill for just that one bit of state.** 
    192  def magic_deco(arg): 
--> 193   call = lambda f, *a, **k: f(*a, **k) 
    194 
    195   if callable(arg): 

/home/mspra/anaconda2/lib/python2.7/site-packages/IPython/core/magics/execution.pyc in time(self, line, cell, local_ns) 
    1161   if mode=='eval': 
    1162    st = clock2() 
-> 1163    out = eval(code, glob, local_ns) 
    1164    end = clock2() 
    1165   else: 

<timed eval> in <module>() 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/core.pyc in to_csv(self, filename, **kwargs) 
    936   """ See dd.to_csv docstring for more information """ 
    937   from .io import to_csv 
--> 938   return to_csv(self, filename, **kwargs) 
    939 
    940  def to_delayed(self): 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.pyc in to_csv(df, filename, name_function, compression, compute, get, **kwargs) 
    411  if compute: 
    412   from dask import compute 
--> 413   compute(*values, get=get) 
    414  else: 
    415   return values 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/base.pyc in compute(*args, **kwargs) 
    177   dsk = merge(var.dask for var in variables) 
    178  keys = [var._keys() for var in variables] 
--> 179  results = get(dsk, keys, **kwargs) 
    180 
    181  results_iter = iter(results) 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/threaded.pyc in get(dsk, result, cache, num_workers, **kwargs) 
    74  results = get_async(pool.apply_async, len(pool._pool), dsk, result, 
    75       cache=cache, get_id=_thread_get_id, 
---> 76       **kwargs) 
    77 
    78  # Cleanup pools associated to dead threads 

/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.pyc in get_async(apply_async, num_workers, dsk, result, cache, get_id, raise_on_exception, rerun_exceptions_locally, callbacks, dumps, loads, **kwargs) 
    491      _execute_task(task, data) # Re-execute locally 
    492     else: 
--> 493      raise(remote_exception(res, tb)) 
    494    state['cache'][key] = res 
    495    finish_task(dsk, key, state, results, keyorder.get) 

**ValueError: invalid literal for long() with base 10: 'total_qty'** 

Traceback 
--------- 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 268, in execute_task 
    result = _execute_task(task, data) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/async.py", line 249, in _execute_task 
    return func(*args2) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 55, in pandas_read_text 
    coerce_dtypes(df, dtypes) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/dask/dataframe/io/csv.py", line 83, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/generic.py", line 3054, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3189, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 3056, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 461, in astype 
    values=values, **kwargs) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/core/internals.py", line 504, in _astype 
    values = _astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/home/mspra/anaconda2/lib/python2.7/site-packages/pandas/types/cast.py", line 534, in _astype_nansafe 
    return lib.astype_intsafe(arr.ravel(), dtype).reshape(arr.shape) 
    File "pandas/lib.pyx", line 980, in pandas.lib.astype_intsafe (pandas/lib.c:17409) 
    File "pandas/src/util.pxd", line 93, in util.set_value_at_unsafe (pandas/lib.c:72777) 

我有幾個問題:

1)首先這個導出在週五工作正常,它吐出了100個csv文件(因爲它有100個分區),我後來彙總了這些文件。那麼今天出了什麼問題 - 來自錯誤日誌的任何內容?

2)可能是這個問題是這個包的創建者,從這個尺寸的dask數據幀中獲取csv提取的最省時的方法是什麼,因爲它需要大約1.5到2個小時,最後一次工作。

我沒有使用dask分佈式,這是在linux集羣的單核上。

回答

1

這個錯誤很可能與to_csv很少有關,並且與計算中的其他內容有關。撥打df.to_csv只是您第一次強制計算滾動瀏覽所有數據。

鑑於此錯誤,我實際上懷疑這是read_csv中的失敗。 Dask.dataframe讀取您的第一個文件的前幾百千字節以猜測數據類型,但它似乎猜錯了。您可能想要嘗試在read_csv調用中明確指定dtypes。

關於快速寫入CSV的第二個問題,我的第一個答案是「使用Parquet或HDF5」。幾乎在每個方面它們都更快更準確。

+0

謝謝!!,是啊懷疑早些時候,因爲我正在從csv格式讀取數據幀。不知道爲什麼它沒有正確閱讀。關於你對第二個問題的建議,是否以鑲木地板格式進行閱讀和寫作(我熟悉實木複合地板)。 –

+0

一個常見原因是整數列有一些缺失值,所以熊貓決定它需要在中途使用浮動。我不明白你對木地板的評論。 – MRocklin

+0

我的意思是,當你說使用實木複合地板或HDF5時,你的意思是讀取地板文件轉換爲dask數據框,然後寫入實木複合地板格式而不是csv格式?如果我使用dask分佈在一組機器上,csv文件可以更快地導出(我的數據框是130 mm x 4列)? –