2016-08-04 51 views
2

我有大小超過10 MB的大型csv文件和大約50多個這樣的文件。這些輸入有超過25列和超過50K行。pandas.io.common.CParserError:標記數據出錯。 C錯誤:捕獲緩衝區溢出 - 可能格式錯誤的輸入文件

所有這些都具有相同的標題,我試圖將它們合併到一個只有一次提及標題的csv中。

選項:一個 代碼:適用於小尺寸csv - 25列,但文件大小以kbs爲單位。

import pandas as pd 
import glob 

interesting_files = glob.glob("*.csv") 
df_list = [] 
for filename in sorted(interesting_files): 
    df_list.append(pd.read_csv(filename)) 

full_df = pd.concat(df_list) 

full_df.to_csv('output.csv') 

但上面的代碼不適用於較大的文件,並給出錯誤。

錯誤:

Traceback (most recent call last): 
    File "merge_large.py", line 6, in <module> 
    all_files = glob.glob("*.csv", encoding='utf8', engine='python')  
TypeError: glob() got an unexpected keyword argument 'encoding' 
[email protected]:~/Desktop/Twitter_Lat_lon/nasik_rain/rain_2$ python merge_large.py 
Traceback (most recent call last): 
    File "merge_large.py", line 10, in <module> 
    df = pd.read_csv(file_,index_col=None, header=0) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 562, in parser_f 
    return _read(filepath_or_buffer, kwds) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 325, in _read 
    return parser.read() 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 815, in read 
    ret = self._engine.read(nrows) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/io/parsers.py", line 1314, in read 
    data = self._reader.read(nrows) 
    File "pandas/parser.pyx", line 805, in pandas.parser.TextReader.read (pandas/parser.c:8748) 
    File "pandas/parser.pyx", line 827, in pandas.parser.TextReader._read_low_memory (pandas/parser.c:9003) 
    File "pandas/parser.pyx", line 881, in pandas.parser.TextReader._read_rows (pandas/parser.c:9731) 
    File "pandas/parser.pyx", line 868, in pandas.parser.TextReader._tokenize_rows (pandas/parser.c:9602) 
    File "pandas/parser.pyx", line 1865, in pandas.parser.raise_parser_error (pandas/parser.c:23325) 
pandas.io.common.CParserError: Error tokenizing data. C error: Buffer overflow caught - possible malformed input file. 

代碼:列25+,但文件大小超過10MB的

選項:Two 選項:​​

選項:四

import pandas as pd 
import glob 

    interesting_files = glob.glob("*.csv") 
    df_list = [] 
    for filename in sorted(interesting_files): 
     df_list.append(pd.read_csv(filename)) 

    full_df = pd.concat(df_list) 

    full_df.to_csv('output.csv') 

錯誤:

Traceback (most recent call last): 
    File "merge_large.py", line 6, in <module> 
    allFiles = glob.glob("*.csv", sep=None) 
TypeError: glob() got an unexpected keyword argument 'sep' 

我已經搜索了很多,但我無法找到一個解決方案將具有相同頭文件的大型csv文件連接成一個文件。

編輯:

代碼:

import dask.dataframe as dd 

ddf = dd.read_csv('*.csv') 

ddf.to_csv('master.csv',index=False) 

錯誤:

Traceback (most recent call last): 
    File "merge_csv_dask.py", line 5, in <module> 
    ddf.to_csv('master.csv',index=False) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/core.py", line 792, in to_csv 
    return to_csv(self, filename, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/io.py", line 762, in to_csv 
    compute(*values) 
    File "/usr/local/lib/python2.7/dist-packages/dask/base.py", line 179, in compute 
    results = get(dsk, keys, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/threaded.py", line 58, in get 
    **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 481, in get_async 
    raise(remote_exception(res, tb)) 
dask.async.ValueError: could not convert string to float: {u'type': u'Point', u'coordinates': [4.34279, 50.8443]} 

Traceback 
--------- 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 263, in execute_task 
    result = _execute_task(task, data) 
    File "/usr/local/lib/python2.7/dist-packages/dask/async.py", line 245, in _execute_task 
    return func(*args2) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 49, in bytes_read_csv 
    coerce_dtypes(df, dtypes) 
    File "/usr/local/lib/python2.7/dist-packages/dask/dataframe/csv.py", line 73, in coerce_dtypes 
    df[c] = df[c].astype(dtypes[c]) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/generic.py", line 2950, in astype 
    raise_on_error=raise_on_error, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2938, in astype 
    return self.apply('astype', dtype=dtype, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 2890, in apply 
    applied = getattr(b, f)(**kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 434, in astype 
    values=values, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/internals.py", line 477, in _astype 
    values = com._astype_nansafe(values.ravel(), dtype, copy=True) 
    File "/usr/local/lib/python2.7/dist-packages/pandas/core/common.py", line 1920, in _astype_nansafe 
    return arr.astype(dtype 

回答

3

如果我理解你的問題,你有相同的結構大的CSV文件你想合併成一個大的CSV文件。

我的建議是使用Continuum Analytics的dask來處理這項工作。您可以合併文件,但也可以像熊貓一樣執行核心外計算和數據分析。

### make sure you include the [complete] tag 
pip install dask[complete] 

解決方案從DropBox的

首先使用您的樣本數據,檢查DASK的版本。對我來說,dask = 0.11.0,pandas = 0.18.1

import dask 
import pandas as pd 
print (dask.__version__) 
print (pd.__version__) 

這裏是讀取所有csvs的代碼。使用DropBox示例數據我沒有錯誤。

import dask.dataframe as dd 
from dask.delayed import delayed 
import dask.bag as db 
import glob 

filenames = glob.glob('/Users/linwood/Downloads/stack_bundle/rio*.csv') 

''' 
The key to getting around the CParse error was using sep=None 
Came from this post 
http://stackoverflow.com/questions/37505577/cparsererror-error-tokenizing-data 
''' 

# custom saver function for dataframes using newfilenames 
def reader(filename): 
    return pd.read_csv(filename,sep=None) 

# build list of delayed pandas csv reads; then read in as dask dataframe 

dfs = [delayed(reader)(fn) for fn in filenames] 
df = dd.from_delayed(dfs) 


''' 
This is the final step. The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your 
files merged. If you don't need to write the merged file to 
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that. 
''' 
df = df.reset_index().compute() 
df.to_csv('./test.csv') 

這樣做的其餘部分是額外的東西

# print the count of values in each column; perfect data would have the same count 
# you have dirty data as the counts will show 

print (df.count().compute()) 

下一步是做一些熊貓樣分析。以下是我首先「清理」'tweetFavoriteCt'專欄數據的一些代碼。所有的數據都不是一個整數,所以我用「0」替換字符串,並將其他所有數據轉換爲整數。一旦我得到的整數變換,我展示一個簡單的分析,我過濾整個數據幀僅包括行,其中的favoriteCt大於3

# function to convert numbers to integer and replace string with 0; sample analytics in dask dataframe 
# you can come up with your own..this is just for an example 
def conversion(value): 
    try: 
     return int(value) 
    except: 
     return int(0) 

# apply the function to the column, create a new column of cleaned data 
clean = df['tweetFavoriteCt'].apply(lambda x: (conversion(x)),meta=('stuff',str)) 

# set new column equal to our cleaning code above; your data is dirty :-(
df['cleanedFavoriteCt'] = clean 

最後的碼位顯示DASK分析以及如何加載這個合併文件轉換成熊貓,並將合併後的文件寫入磁盤。需要警告的是,如果您有大量的CSV,當您使用下面的代碼.compute()時,它會將此合併的csv加載到內存中。

# retreive the 50 tweets with the highest favorite count 
print(df.nlargest(50,['cleanedFavoriteCt']).compute()) 

# only show me the tweets that have been favorited at least 3 times 
# TweetID 763525237166268416, is VERRRRY popular....7000+ favorites 
print((df[df.cleanedFavoriteCt.apply(lambda x: x>3,meta=('stuff',str))]).compute()) 

''' 
This is the final step. The .compute() code below turns the 
dask dataframe into a single pandas dataframe with all your 
files merged. If you don't need to write the merged file to 
disk, I'd skip this step and do all the analysis in 
dask. Get a subset of the data you want and save that. 
''' 
df = df.reset_index().compute() 
df.to_csv('./test.csv') 

現在,如果你想切換到熊貓的合併csv文件:

import pandas as pd 
dff = pd.read_csv('./test.csv') 

讓我知道這是否正常工作。

到此爲止

存檔:以前的解決方案;以使用dask合併CSV的示例爲例

第一步是確保安裝了dask。有install instructions for dask in the documentation page,但這應該工作:

隨着dask安裝它很容易閱讀的文件。

有些家務優先。假設我們有一個csvs目錄,其中的文件名是my18.csv,my19.csv,my20.csv等。名稱標準化和單個目錄位置是關鍵。如果您將csv文件放在一個目錄中並以某種方式序列化名稱,這將起作用。

步驟:

  1. 進口DASK,請閱讀使用通配符所有的CSV文件。這將所有csvs合併成一個單一的dask.dataframe對象。如果需要,您可以在此步驟後立即進行類似熊貓的操作。
import dask.dataframe as dd 
ddf = dd.read_csv('./daskTest/my*.csv') 
ddf.describe().compute() 
  • 寫在相同的目錄中的原始文件和名稱合併數據幀文件保存到磁盤它master.csv
  • ddf.to_csv('./daskTest/master.csv',index=False) 
    
  • 可選,讀取master.csv,尺寸大得多,轉換成用於計算的dask.dataframe對象。這也可以在上面的第一步之後完成;達斯可以在舞臺上的文件上執行像操作一樣的熊貓...這是一種在Python
  • # reads in the merged file as one BIG out-of-core dataframe; can perform functions like pangas  
    newddf = dd.read_csv('./daskTest/master.csv') 
    
    #check the length; this is now length of all merged files. in this example, 50,000 rows times 11 = 550000 rows. 
    len(newddf) 
    
    # perform pandas-like summary stats on entire dataframe 
    newddf.describe().compute() 
    

    做「大數據」希望這有助於解答您的問題。在三個步驟中,您可以讀取所有文件,合併爲單個數據幀,然後將該大量數據幀寫入磁盤,只有一個標題和所有行。

    +0

    非常感謝你的詳細解釋。這是很多信息。讓我試一下這個代碼,如果我有任何疑問,會讓你知道。再次感謝:) –

    +0

    @SitzBlogz,只是檢查它是否工作。 – Linwoodc3

    +0

    遲到的道歉。我遇到了錯誤,並且在編輯部分中包含了代碼和錯誤。請你可以檢查。 –