我查找一個隊列,該隊列在一定的時間(即10秒)內存儲最多N
個元素,或者應該處理最早的值(如果已滿)。帶生存時間的消息隊列
我在Apache Collections(CircularFifoQueue
JavaDoc)中發現了一個類似的隊列,它忽略了生存時間方面。一個完整的Fletched消息代理看起來像是我的小項目的開銷。
你介意給我一個提示,我應該如何實現這一點?當我輪詢元素時,我應該過濾掉舊的值嗎?
我查找一個隊列,該隊列在一定的時間(即10秒)內存儲最多N
個元素,或者應該處理最早的值(如果已滿)。帶生存時間的消息隊列
我在Apache Collections(CircularFifoQueue
JavaDoc)中發現了一個類似的隊列,它忽略了生存時間方面。一個完整的Fletched消息代理看起來像是我的小項目的開銷。
你介意給我一個提示,我應該如何實現這一點?當我輪詢元素時,我應該過濾掉舊的值嗎?
我習慣了以下的隊列實現。代碼很大程度上基於Apaches CircularFifoQueue,並且只能進行弱測試。而且執行的是非線程安全和不可序列化。
如果您發現錯誤,請留下評論。
import java.util.AbstractCollection;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
/**
* TimedQueue is a first-in first-out queue with a fixed size that
* replaces its oldest element if full.
* <p>
* The removal order of a {@link TimedQueue} is based on the
* insertion order; elements are removed in the same order in which they
* were added. The iteration order is the same as the removal order.
* <p>
* The {@link #add(Object)}, {@link #remove()}, {@link #peek()}, {@link #poll},
* {@link #offer(Object)} operations all perform in constant time.
* All other operations perform in linear time or worse.
* <p>
* This queue prevents null objects from being added and it is not thread-safe and not serializable.
*
* The majority of this source code was copied from Apaches {@link CircularFifoQueue}: http://commons.apache.org/proper/commons-collections/apidocs/org/apache/commons/collections4/queue/CircularFifoQueue.html
*
* @version 1.0
*/
public class TimedQueue<E> extends AbstractCollection<E>
implements Queue<E> {
/** Underlying storage array. */
private Item<E>[] elements;
/** Array index of first (oldest) queue element. */
private int start = 0;
/**
* Index mod maxElements of the array position following the last queue
* element. Queue elements start at elements[start] and "wrap around"
* elements[maxElements-1], ending at elements[decrement(end)].
* For example, elements = {c,a,b}, start=1, end=1 corresponds to
* the queue [a,b,c].
*/
private transient int end = 0;
/** Flag to indicate if the queue is currently full. */
private transient boolean full = false;
/** Capacity of the queue. */
private final int maxElements;
private TimeUnit unit;
private int delay;
/**
* Constructor that creates a queue with the default size of 32.
*/
public TimedQueue() {
this(32);
}
/**
* Constructor that creates a queue with the specified size.
*
* @param size the size of the queue (cannot be changed)
* @throws IllegalArgumentException if the size is < 1
*/
public TimedQueue(final int size) {
this(size, 3, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked")
public TimedQueue(final int size, int delay, TimeUnit unit) {
if (size <= 0) {
throw new IllegalArgumentException("The size must be greater than 0");
}
elements = new Item[size];
maxElements = elements.length;
this.unit = unit;
this.delay = delay;
}
/**
* Constructor that creates a queue from the specified collection.
* The collection size also sets the queue size.
*
* @param coll the collection to copy into the queue, may not be null
* @throws NullPointerException if the collection is null
*/
public TimedQueue(final Collection<? extends E> coll) {
this(coll.size());
addAll(coll);
}
/**
* Returns the number of elements stored in the queue.
*
* @return this queue's size
*/
@Override
public int size() {
int size = 0;
for(int i = 0; i < elements.length; i++) {
if(validElement(i) != null) {
size++;
}
}
return size;
}
/**
* Returns true if this queue is empty; false otherwise.
*
* @return true if this queue is empty
*/
@Override
public boolean isEmpty() {
return size() == 0;
}
private boolean isAtFullCapacity() {
return size() == maxElements;
}
/**
* Clears this queue.
*/
@Override
public void clear() {
full = false;
start = 0;
end = 0;
Arrays.fill(elements, null);
}
/**
* Adds the given element to this queue. If the queue is full, the least recently added
* element is discarded so that a new element can be inserted.
*
* @param element the element to add
* @return true, always
* @throws NullPointerException if the given element is null
*/
@Override
public boolean add(final E element) {
if (null == element) {
throw new NullPointerException("Attempted to add null object to queue");
}
if (isAtFullCapacity()) {
remove();
}
elements[end++] = new Item<E>(element);
if (end >= maxElements) {
end = 0;
}
if (end == start) {
full = true;
}
return true;
}
/**
* Returns the element at the specified position in this queue.
*
* @param index the position of the element in the queue
* @return the element at position {@code index}
* @throws NoSuchElementException if the requested position is outside the range [0, size)
*/
public E get(final int index) {
final int sz = size();
if (sz == 0) {
throw new NoSuchElementException(
String.format("The specified index (%1$d) is outside the available range because the queue is empty.", Integer.valueOf(index)));
}
if (index < 0 || index >= sz) {
throw new NoSuchElementException(
String.format("The specified index (%1$d) is outside the available range [0, %2$d]",
Integer.valueOf(index), Integer.valueOf(sz-1)));
}
final int idx = (start + index) % maxElements;
return validElement(idx);
}
private E validElement(int idx) {
if(elements[idx] == null){
return null;
}
long diff = System.currentTimeMillis() - elements[idx].getCreationTime();
if(diff < unit.toMillis(delay)) {
return (E) elements[idx].getValue();
} else {
elements[idx] = null;
return null;
}
}
//-----------------------------------------------------------------------
/**
* Adds the given element to this queue. If the queue is full, the least recently added
* element is discarded so that a new element can be inserted.
*
* @param element the element to add
* @return true, always
* @throws NullPointerException if the given element is null
*/
public boolean offer(E element) {
return add(element);
}
public E poll() {
if (isEmpty()) {
return null;
}
return remove();
}
public E element() {
if (isEmpty()) {
throw new NoSuchElementException("queue is empty");
}
return peek();
}
public E peek() {
if (isEmpty()) {
return null;
}
return (E) elements[start].getValue();
}
public E remove() {
if (isEmpty()) {
throw new NoSuchElementException("queue is empty");
}
final E element = validElement(start);
if (null != element) {
elements[start++] = null;
if (start >= maxElements) {
start = 0;
}
full = false;
}
return element;
}
/**
* Increments the internal index.
*
* @param index the index to increment
* @return the updated index
*/
private int increment(int index) {
index++;
if (index >= maxElements) {
index = 0;
}
return index;
}
/**
* Decrements the internal index.
*
* @param index the index to decrement
* @return the updated index
*/
private int decrement(int index) {
index--;
if (index < 0) {
index = maxElements - 1;
}
return index;
}
/**
* Returns an iterator over this queue's elements.
*
* @return an iterator over this queue's elements
*/
@Override
public Iterator<E> iterator() {
return new Iterator<E>() {
private int index = start;
private int lastReturnedIndex = -1;
private boolean isFirst = full;
public boolean hasNext() {
return (isFirst || index != end) && size() > 0;
}
public E next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
isFirst = false;
lastReturnedIndex = index;
index = increment(index);
if(validElement(lastReturnedIndex) == null) {
return next();
} else {
return validElement(lastReturnedIndex);
}
}
public void remove() {
if (lastReturnedIndex == -1) {
throw new IllegalStateException();
}
// First element can be removed quickly
if (lastReturnedIndex == start) {
TimedQueue.this.remove();
lastReturnedIndex = -1;
return;
}
int pos = lastReturnedIndex + 1;
if (start < lastReturnedIndex && pos < end) {
// shift in one part
System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
} else {
// Other elements require us to shift the subsequent elements
while (pos != end) {
if (pos >= maxElements) {
elements[pos - 1] = elements[0];
pos = 0;
} else {
elements[decrement(pos)] = elements[pos];
pos = increment(pos);
}
}
}
lastReturnedIndex = -1;
end = decrement(end);
elements[end] = null;
full = false;
index = decrement(index);
}
};
}
private static final class Item<E> {
private long creationTime;
private E in;
public Item(E in) {
this.in = in;
creationTime = System.currentTimeMillis();
}
public E getValue() {
return in;
}
public long getCreationTime() {
return creationTime;
}
}
}
有一個叫LinkedHashMap的類,它有一個特殊的方法來刪除陳舊的數據。從文檔:
保護布爾removeEldestEntry(Map.Entry的長子)
返回true,如果此映射移除其最舊的條目。
只要有任何事情被添加到列表(隊列),方法removeEldestEntry
被調用。如果它返回true
那麼最老的條目將被刪除以爲新條目騰出空間,否則不會刪除任何內容。您可以添加您自己的實現,檢查最長條目上的時間戳,並且如果它早於過期閾值(例如10秒),則返回true
。因此,您的實現可能是這個樣子:
protected boolean removeEldestEntry(Map.Entry eldest) {
long currTimeMillis = System.currentTimeMillis();
long entryTimeMillis = eldest.getValue().getTimeCreation();
return (currTimeMillis - entryTimeMillis) > (1000*10*60);
}
我覺得java.util.LinkedHashMap是你的解決方案。它有一個removeEldest()
方法,只要在地圖中放入一個條目就會調用它。您可以覆蓋它以指示是否應刪除最長的條目。
的JavaDoc中給出了一個很好的例子:
private static final int MAX_ENTRIES = 100;
protected boolean removeEldestEntry(Map.Entry eldest) {
return size() > MAX_ENTRIES;
}
這將刪除舊的條目,如果地圖上有超過100個條目。
10秒後主動移除物品將需要單獨的線程來檢查年齡並移除舊物品。我猜這不是你想要的,根據你的描述來判斷。
這個答案應該是一個評論。 –
這看起來不錯。但是,我想刪除超過10秒的項目。是否已有這種「基於時間的隊列」的實現? – Markus
我會檢查並保持張貼! – Markus
這個答案太棒了! –
只有在添加新條目時才調用removeEldestEntry方法。舊的值仍保留在地圖中。 – Markus