0
我想實現一個StreamingMedian
對象通過get_median()
連續調用,以維持平均實現MinHeap時堅持的狀態。爲此,我還通過heapq
模塊實施了MinHeap
和MaxHeap
類。與heapq
雖然我遇到了一個非常奇怪的錯誤。出於某種原因,當我運行下面的命令:
print("Before streaming medians", MinHeap(), sep="\t") # is empty
b = StreamingMedian()
b.add_val(5)
b.add_val(100)
assert b.get_median() == 52.5
print("After streaming medians, for MaxHeap", MaxHeap(), sep='\t') # is empty
print("After streaming medians, for MinHeap", MinHeap(), sep='\t') # should be empty
print("After streaming medians, for MinHeap with input",
MinHeap([]), sep="\t") # is empty
我得到以下的輸出:
Before streaming medians []
After streaming medians, for MaxHeap []
After streaming medians, for MinHeap [100]
After streaming medians, for MinHeap with input []
類的實現可以在下面找到。我在Python 3.5.2 :: Anaconda自定義(64位)上運行它。
import heapq
class MinHeap(object):
def __init__(self, l=[]):
self.heap = l
heapq.heapify(l)
def peek(self):
return self.heap[0]
def pop(self):
return heapq.heappop(self.heap)
def push(self, x):
heapq.heappush(self.heap, x)
def pushpop(self, x):
return heapq.heappushpop(self.heap, x)
def replace(self, x):
return heapq.heapreplace(self.heap, x)
def __len__(self):
return len(self.heap)
def __str__(self):
return str(self.heap)
class MaxHeap(MinHeap):
def _invert_sign(self, l):
return [-1 * a for a in l]
def __init__(self, l=[]):
super().__init__(self._invert_sign(l))
def push(self, x):
super().push(-1 * x)
def pushpop(self, x):
return super().pushpop(-1 * x)
def replace(self, x):
return super().replace(-1 * x)
def pop(self):
return -1 * super().pop()
def peek(self):
return -1 * super().peek()
def __str__(self):
return str(self._invert_sign(self.heap))
class StreamingMedian():
def __init__(self):
self.min_heap = MinHeap()
self.max_heap = MaxHeap()
def get_median(self):
min_heap_has_x_more = len(self.min_heap) - len(self.max_heap)
if min_heap_has_x_more > 0:
return self.min_heap.peek()
elif min_heap_has_x_more < 0:
return self.max_heap.peek()
else:
return (self.min_heap.peek() + self.max_heap.peek())/2
def add_val(self, x):
if len(self.min_heap) + len(self.max_heap) == 0:
self.max_heap.push(x)
else:
med = self.get_median()
if x > med:
self.min_heap.push(x)
self._ensure_balance()
elif x < med:
self.max_heap.push(x)
self._ensure_balance()
else:
self.max_heap.push(x)
self._ensure_balance()
def _ensure_balance(self):
size_diff = len(self.min_heap) - len(self.max_heap)
if abs(size_diff) > 1:
if size_diff > 0: # min_heap has 2 more elements
self.max_heap.push(self.min_heap.pop())
else: # max_heap has 2 more elements
self.min_heap.push(self.max_heap.pop())
assert abs(len(self.min_heap) - len(self.max_heap)) < 2
print("Before streaming medians", MinHeap(), sep="\t")
b = StreamingMedian()
b.add_val(5)
b.add_val(100)
assert b.get_median() == 52.5
print("After streaming medians, for MaxHeap", MaxHeap(), sep='\t') # is empty
print("After streaming medians, for MinHeap", MinHeap(), sep='\t') # should be empty
print("After streaming medians, for MinHeap with input", MinHeap([]), sep="\t") # is empty