2011-09-27 69 views
13

我有非常大的數據集存儲在硬盤上的二進制文件。這裏是文件結構的一個例子:從二進制文件創建Numpy數組的有效方法

文件頭

149 Byte ASCII Header 

記錄開始

4 Byte Int - Record Timestamp 

樣品開始

2 Byte Int - Data Stream 1 Sample 
2 Byte Int - Data Stream 2 Sample 
2 Byte Int - Data Stream 3 Sample 
2 Byte Int - Data Stream 4 Sample 

樣品結束

每個記錄有122,880個樣本和每個文件有713個記錄。這產生了700,910,521字節的總大小。採樣率和記錄數量有時會有所不同,所以我必須編碼以檢測每個文件的每個數量。

目前我使用這個數據導入到陣列中的代碼是這樣的:

from time import clock 
from numpy import zeros , int16 , int32 , hstack , array , savez 
from struct import unpack 
from os.path import getsize 

start_time = clock() 
file_size = getsize(input_file) 

with open(input_file,'rb') as openfile: 
    input_data = openfile.read() 

header = input_data[:149] 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

time_series = zeros(0,dtype=int32) 
t_series = zeros(0,dtype=int16) 
x_series = zeros(0,dtype=int16) 
y_series = zeros(0,dtype=int16) 
z_series = zeros(0,dtype=int16) 

for record in xrange(number_of_records): 

    time_stamp = array(unpack('<l' , input_data[ 149 + (record * record_size) : 149 + (record * record_size) + 4 ]) , dtype = int32) 
    unpacked_record = unpack('<' + str(sample_rate * 4) + 'h' , input_data[ 149 + (record * record_size) + 4 : 149 + ((record + 1) * record_size) ]) 

    record_t = zeros(sample_rate , dtype=int16) 
    record_x = zeros(sample_rate , dtype=int16) 
    record_y = zeros(sample_rate , dtype=int16) 
    record_z = zeros(sample_rate , dtype=int16) 

    for sample in xrange(sample_rate): 

    record_t[sample] = unpacked_record[ (sample * 4) + 0 ] 
    record_x[sample] = unpacked_record[ (sample * 4) + 1 ] 
    record_y[sample] = unpacked_record[ (sample * 4) + 2 ] 
    record_z[sample] = unpacked_record[ (sample * 4) + 3 ] 

    time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, time=time_series) 
end_time = clock() 
print 'Total Time',end_time - start_time,'seconds' 

目前這需要每個700 MB的文件約250秒,這對我來說似乎是非常高的。有沒有更有效的方法可以做到這一點?

最終解決

使用與自定義的numpy的FROMFILE方法D型運行時切斷至9秒,比上述原始代碼27倍快。最終的代碼如下。

from numpy import savez, dtype , fromfile 
from os.path import getsize 
from time import clock 

start_time = clock() 
file_size = getsize(input_file) 

openfile = open(input_file,'rb') 
header = openfile.read(149) 
record_size = int(header[23:31]) 
number_of_records = (file_size - 149)/record_size 
sample_rate = ((record_size - 4)/4)/2 

record_dtype = dtype([ ('timestamp' , '<i4') , ('samples' , '<i2' , (sample_rate , 4)) ]) 

data = fromfile(openfile , dtype = record_dtype , count = number_of_records) 
time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 

savez(output_file, t=t_series , x=x_series ,y=y_series, z=z_series, fid=time_series) 

end_time = clock() 

print 'It took',end_time - start_time,'seconds' 
+0

它是醫療數據? EDF?如果你不知道我在說什麼,不要介意......; o)無論如何,看看我的答案,根據這個問題,我用它來打開醫療數據二進制文件:http://stackoverflow.com/q/5804052/401828。那裏有一個有趣的討論。 – heltonbiker

+0

不是地球物理數據。在發佈之前我在研究過程中看到了您的問題。您的數據只包含簡短的整數,其中不幸的是整個流中分散了4個字節的整數時間戳。 – Stu

+0

對於它的價值,numpy結構化數組上的許多操作比常規numpy數組慢得多。導入時間可能會更快,但計算時間可能會延長10-100倍:( –

回答

13

一些提示:

是這樣的(未經測試,但你的想法):

 
import numpy as np 

file = open(input_file, 'rb') 
header = file.read(149) 

# ... parse the header as you did ... 

record_dtype = np.dtype([ 
    ('timestamp', '<i4'), 
    ('samples', '<i2', (sample_rate, 4)) 
]) 

data = np.fromfile(file, dtype=record_dtype, count=number_of_records) 
# NB: count can be omitted -- it just reads the whole file then 

time_series = data['timestamp'] 
t_series = data['samples'][:,:,0].ravel() 
x_series = data['samples'][:,:,1].ravel() 
y_series = data['samples'][:,:,2].ravel() 
z_series = data['samples'][:,:,3].ravel() 
+0

現在總時間爲9.07秒,包括savez!謝謝。我將用最終的代碼更新這個問題。 – Stu

+0

偉大的答案,優良的使用numpy內置功能! +1 – heltonbiker

+0

你怎麼知道你是否有頭,多長時間? – mmann1123

2

numpy的支持從數據映射二進制直接進入陣列等經由numpy.memmap對象。您可能能夠通過偏移映射文件並提取所需的數據。

對於字節序的正確性只使用numpy.byteswap對你們在讀什麼,你可以使用條件表達式來檢查主機系統的字節序:

if struct.pack('=f', np.pi) == struct.pack('>f', np.pi): 
    # Host is big-endian, in-place conversion 
    arrayName.byteswap(True) 
+0

我已經看過,但似乎沒有辦法指定數據的字節順序。代碼需要在兩個窗口下工作, unix所以endian-ness需要明確陳述 – Stu

+0

你可以使用[numpy.byteswap](http://docs.scipy.org/doc/numpy/reference/generated/numpy.memmap.html)來設置正確的如果需要的話,數據的字節順序,請參閱編輯 – talonmies

+0

對於函數'numpy.memmap',這看起來非常有用 – heltonbiker

2

一個明顯的低效率是在使用hstack一個循環:

time_series = hstack ((time_series , time_stamp)) 
    t_series = hstack ((t_series , record_t)) 
    x_series = hstack ((x_series , record_x)) 
    y_series = hstack ((y_series , record_y)) 
    z_series = hstack ((z_series , record_z)) 

在每次迭代,這對於分配給每一個系列,並將所有數據讀取到目前爲止到它的一個稍微大一點的數組。這涉及到批次的不必要的複製,並可能導致內存碎片錯誤。

我積累的time_stamp值在列表中,做在一端hstack,並會做同樣的record_t

如果沒有攜帶足夠的性能提升,我會註釋掉循環的主體,並開始將事情一次性帶回,以查看所花的時間。

+0

好吧,執行此操作可以將時間縮短到110秒!!謝謝,我是 – Stu

+0

也是110秒的時間,大概有40個是我不能優化的savez函數,雖然比較加載.npz只需要20秒 – Stu

+0

我必須有蜜蜂n關於savez錯誤,因爲使用自定義dtype的fromfile方法,包括savez的時間降至9秒。 – Stu

0

通過使用arraystruct.unpack,我得到了令人滿意的結果以及類似的問題(多分辨率多通道二進制數據文件)。在我的問題中,我希望每個通道都有連續的數據,但文件具有面向間隔的結構,而不是面向通道的結構。

的「祕密」是先讀取整個文件,並且只有然後分發已知大小的切片,以所希望的容器(下面的代碼,self.channel_content[channel]['recording']array類型的對象):

f = open(somefilename, 'rb')  
fullsamples = array('h') 
fullsamples.fromfile(f, os.path.getsize(wholefilename)/2 - f.tell()) 
position = 0 
for rec in xrange(int(self.header['nrecs'])): 
    for channel in self.channel_labels: 
     samples = int(self.channel_content[channel]['nsamples']) 
     self.channel_content[channel]['recording'].extend(fullsamples[position:position+samples]) 
      position += samples 

當然,我不能說這比其他答案更好或更快,但至少是你可能評估的東西。

希望它有幫助!