2017-10-09 36 views
0

我對PySpark頗爲陌生,我試圖加載和預處理一些數據。但是,一些數據似乎已損壞,我需要處理這些錯誤,並找出哪些文件已損壞。PySpark - 加載和保存文件路徑,一些損壞

def load_audio(path, sr = 22050): 
    return librosa.load(path,sr=sr) 

rdd = sc.parallelize(paths) #Path to all files 
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) # Loads all the audio files 

只要load_audio嘗試加載一個損壞的文件,程序崩潰。我意識到嘗試和趕上可以工作,但我不明白它在這種情況下如何解決我的問題。

我理想的是加載所有未損壞的文件。我想要加載到它自己的rdd /列表中時損壞的文件的路徑/引發錯誤。

我該如何做到這一點? :)

+0

你是什麼意思崩潰?你是否收到特定的錯誤信息? – desertnaut

+0

我在損壞的文件上得到一個EOFError。 –

回答

2

嘗試/除了輕鬆解決您的問題。我們可以在這裏使用oldschool/Go的思維模式,而是使用異常函數返回錯誤代碼或類似的結果。

我沒有你的文件重現,所以我會創建類似的錯誤,通過使用函數,只是檢查路徑,並拋出異常時,文件是MP3。比方說,我有文件的列表:

import random 
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)] 
std_audio_paths = '/' 

def load_audio(path, sr=22050): 
    if path.endswith('mp3'): 
     raise IOError('Unknown format') 
    return 'audio data' 




rdd = sc.parallelize(paths) 
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path)))) 
audio.collect() 
# will break because load_audio throws exception 

所以我會使用try /除了創建一個圍繞load_audio包裝,不會引發異常,但返回錯誤:

def try_to_load_audio(path): 
    try: 
     return load_audio(path), None 
    except IOError as e: 
     return '', 'error' 

audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path)))) 
audio.collect() 
# returns collection of pairs (path, (result, error)): 
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ... 

我們的第二部分你的問題。取而代之的collect(),做audio.cache(),這樣你就不會加載文件兩次,然後:

failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0]) 
failed.collect() 
# gives paths only of failed files (mp3 in my case): 
# ['0.mp3', '3.mp3', ... 

而且只具有加載音頻數據你做:

loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0]) 
loaded_data.collect() 
# Gives ['audio data', 'audio data',... 
+0

感謝您的意見,但這並不是我所要求的。我有很多數據,加載音頻需要很長時間。我想要一個rdd /加載的音頻列表,而另一個只有文件路徑已損壞且無法加載。 –

+0

@CarlRynegardh對不起,錯過了你的問題的第二部分。更新了我的答案,並解釋瞭如何使用過濾器來創建具有錯誤路徑的RDD。 – Bunyk