2013-02-14 40 views
1

給定一個numpy的陣列,這樣,包含任意的數據:迭代一個numpy的陣列,選擇性地收集一個或兩個值,給定的判據

>>> data 
array([ 1, 172, 32, ..., 42, 189, 29], dtype=int8) # SIGNED int8 

...我需要構造一個numpy的陣列「結果'如下:

(請原諒僞代碼的實現,如果我知道該怎麼做,我不會問,如果我有一個工作的numpy實現,我會直接把我的問題改爲CodeReview。

for value in data, check: 
    if value & 0x01: 
     result.append((value >> 1 << 8) + next(value).astype(numpy.uint8)) 
     # that is: take TWO values from 'data', one signed, the next un-signed, glue them together, appending ONE int16 to result 
    else: 
     result.append(value >> 1) 
     # that is: take ONE value from 'data', appending ONE int8 to result 

我已經在「普通」的Python中實現了這個。它工作得很好,但可以使用numpy和非常高效的數組操作進行優化。我想擺脫名單和追加。可悲的是,我不知道如何完成它:

# data is a string of 'bytes' received from a device 
def unpack(data): 
    l = len(data) 
    p = 0 
    result = [] 

    while p < l: 
     i1 = (((ord(data[p]) + 128) % 256) - 128) 
     p += 1 
     if i1 & 0x01: 
      # read next 'char' as an uint8 
      # 
      # due to the nature of the protocol, 
      # we will always have sufficient data 
      # available to avoid reading past the end 
      i2 = ord(data[p]) 
      p += 1 
      result.append((i1 >> 1 << 8) + i2) 
     else: 
      result.append(i1 >> 1) 

    return result 

更新:感謝@Jaime我已經設法實現高效的解壓縮功能。這與他的非常相似,雖然速度更快。 while循環當然是關鍵部分。我在這裏發佈它,以防萬一有興趣:

def new_np_unpack(data): 
    mask = (data & 0x01).astype(numpy.bool) 

    true_positives = None 

    while True: 
     # check for 'true positives' in the tentative mask 
     # the next item must by definition be a false one 
     true_positives = numpy.nonzero(numpy.logical_and(mask, numpy.invert(numpy.concatenate(([False], mask[:-1])))))[0] 

     # loop until no more 'false positives' 
     if not numpy.any(mask[true_positives+1]): 
      break 

     mask[true_positives+1] = False 

    result = numpy.empty(data.shape, dtype='int16') 
    result[:] = data.astype('int8') >> 1 
    result[true_positives] = (result[true_positives] << 8) + data[true_positives + 1] 
    mask = numpy.ones(data.shape, dtype=bool) 
    mask[true_positives + 1] = False 
    return result[mask] 

回答

1

我得到了一些矢量化的工作。爲了便於比較,我把ord(...)出你的代碼,並饋送它像數據:

data = np.random.randint(256, size=(1000000,)).astype('uint8') 
data[-1] = 0 # to avoid errors with last element 

我的版本的功能:

def np_unpack(data) : 
    # find where condition is met 
    mask = (data & 0x01).astype(bool) 
    # Keep only 1st, 3rd, 5th... consecutive occurrences of True in mask 
    new_mask = mask[:] 
    mult = -1 
    while new_mask.sum() : 
     new_mask = np.logical_and(new_mask, 
            np.concatenate(([False], new_mask[:-1]))) 
     mask += new_mask * mult 
     mult *= -1 
    del new_mask 
    cond = np.nonzero(mask)[0] 
    result = np.empty(data.shape, dtype='int16') 
    result[:] = data.astype('int8') >> 1 
    result[cond] <<= 8 
    result[cond] += data[cond + 1] 
    mask = np.ones(data.shape, dtype=bool) 
    mask[cond + 1] = False 
    return result[mask] 

而且一些測試用1M元素的列表:

In [4]: np.all(unpack(data) == np_unpack(data)) 
Out[4]: True 

In [5]: %timeit unpack(data) 
1 loops, best of 3: 7.11 s per loop 

In [6]: %timeit np_unpack(data) 
1 loops, best of 3: 294 ms per loop 
+0

謝謝!我已驗證您的功能,並按預期工作。現在我只需要弄清楚你在做什麼。 :-) – Micke 2013-02-15 10:42:32

+1

@Micke唯一棘手的部分是'while'循環。在每次迭代之後添加一些'mask'和'new_mask'的打印,並且給它像'[1,0,1,1,0,1,1,1,0,1,1,1,1]',它應該讓事情很清楚。 – Jaime 2013-02-15 14:43:48

+0

我終於明白了。把你的代碼逐個分開,我現在明白了。事實上,我做了一些修改,將執行時間縮短了30%。再次感謝您花時間幫助! – Micke 2013-02-15 16:13:34