嘗試/除了輕鬆解決您的問題。我們可以在這裏使用oldschool/Go的思維模式,而是使用異常函數返回錯誤代碼或類似的結果。
我沒有你的文件重現,所以我會創建類似的錯誤,通過使用函數,只是檢查路徑,並拋出異常時,文件是MP3。比方說,我有文件的列表:
import random
paths = [str(i) + '.' + random.choice(('mp3', 'ogg')) for i in xrange(100)]
std_audio_paths = '/'
def load_audio(path, sr=22050):
if path.endswith('mp3'):
raise IOError('Unknown format')
return 'audio data'
rdd = sc.parallelize(paths)
audio = rdd.map(lambda path: (path,load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# will break because load_audio throws exception
所以我會使用try /除了創建一個圍繞load_audio
包裝,不會引發異常,但返回錯誤:
def try_to_load_audio(path):
try:
return load_audio(path), None
except IOError as e:
return '', 'error'
audio = rdd.map(lambda path: (path,try_to_load_audio(os.path.join(std_audio_path,path))))
audio.collect()
# returns collection of pairs (path, (result, error)):
# [('0.mp3', ('', 'error')), ('1.ogg', ('audio data', None)), ...
我們的第二部分你的問題。取而代之的collect()
,做audio.cache()
,這樣你就不會加載文件兩次,然後:
failed = audio.filter(lambda f: f[1][1] == 'error').map(lambda f: f[0])
failed.collect()
# gives paths only of failed files (mp3 in my case):
# ['0.mp3', '3.mp3', ...
而且只具有加載音頻數據你做:
loaded_data = audio.filter(lambda f: f[1][1] is None).map(lambda f: f[1][0])
loaded_data.collect()
# Gives ['audio data', 'audio data',...
你是什麼意思崩潰?你是否收到特定的錯誤信息? – desertnaut
我在損壞的文件上得到一個EOFError。 –