當後處理器的數據源是太不可預知的(像你的情況沒有推送能力就像一個遠程Web服務),以最好的方式是創建一個流和這個處理器後處理器告訴ActivePivot該後處理器的整體結果每改變N秒。
您可以創建一個TickingStream,其中以固定的週期這樣的發送(空)事件:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Timer;
import java.util.TimerTask;
import com.quartetfs.biz.pivot.IActivePivotSession;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine;
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector;
/**
* Stream sending an event at a regular rate.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY)
public class TickingStream extends AStream<Void> {
private static final long serialVersionUID = 1L;
public static final String PLUGIN_KEY = "TICKING";
/** The default ticking period, in ms. **/
protected static final long DEFAULT_PERIOD = 1000;
/** The ticking period, in ms. **/
protected long period = DEFAULT_PERIOD;
/** The task responsible for sending the ticking events. */
protected final TimerTask sendEventTask;
/** The timer that schedules the {@link #sendEventTask}. */
protected Timer tickingTimer;
/**
* Create a ticking stream.
*
* @param engine
* @param session
*/
public TickingStream(IAggregatesContinuousQueryEngine engine,
IActivePivotSession session) {
super(engine, session);
// Create the task that will send the events.
sendEventTask = new TimerTask() {
@Override
public void run() {
sendEvent(null);
}
};
// Schedule this task with the default period:
rescheduleTask();
}
/**
* Schedule the {@link #sendEventTask} with the {@link #period} period.
* Removes also all previous scheduling of this task.
*/
protected void rescheduleTask() {
if (tickingTimer != null) {
tickingTimer.cancel();
}
tickingTimer = new Timer();
tickingTimer.schedule(sendEventTask, 0, period);
}
/**
* Change the ticking period of this stream. This will reschedule the task
* according to this new period. This setter will be called via
* {@link ExtendedPluginInjector extended plugin injection}
*
* @param period the period to set. Must be strictly positive.
*
* @throws IllegalArgumentException if period is smaller or equal to 0.
*/
public void setPeriod(long period) {
if (period <= 0) {
throw new IllegalArgumentException("Non-positive period.");
}
this.period = period;
rescheduleTask();
}
/** {@inheritDoc} */
@Override
public Class<Void> getEventType() {
return Void.class;
}
/** {@inheritDoc} */
@Override
public String getType() {
return PLUGIN_KEY;
}
}
而且處理程序,請刷新與具有後處理器的所有連續查詢的一部分這個處理器在每個刻度:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.Collections;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.query.aggregates.IImpact;
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler;
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
* The handler associated with a {@link TickingStream}.
*
* This handler asks for a full refresh of the locations queried on
* post-processors with this handler each time it receives a tick.
* <p>
* This is the handler to use for post processors that have unpredictable data
* sources which prevent the creation of a stream and a handler that can decide
* which subset of the currently queried locations should be updated in a
* continuous query.
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY)
public class TickingHandler extends AAggregatesContinuousHandler<Void> {
private static final long serialVersionUID = 1L;
/**
* @param pivot
*/
public TickingHandler(IActivePivot pivot) {
super(pivot);
}
/**
* {@inheritDoc}
* <p>
* The impact on a queried location is the whole location since there is no
* way for us to know which part of the location should be updated or not.
*/
@Override
public IImpact computeImpact(ILocation location, Void event) {
return new Impact(location, Collections.singleton(location), Collections.singleton(location));
}
/** {@inheritDoc} */
@Override
public String getStreamKey() {
// This handler is made to be used with the TickingStream.
return TickingStream.PLUGIN_KEY;
}
/** {@inheritDoc} */
@Override
public String getType() {
return TickingStream.PLUGIN_KEY;
}
}
而且您配置後處理器使用該處理程序是這樣的:
<measure name="..." folder="..." aggregationFunctions="...">
<postProcessor pluginKey="yourPPpluginKey">
<properties>
<entry key="continuousQueryHandlerKeys" value="TICKING" />
</properties>
</postProcessor>
</measure>
從某種意義上說,您將會投票Google Finance服務,因爲您的後處理器將每秒被調用一次(默認期)。但是,只有當用戶使用後處理器進行連續查詢時纔會發生這種情況,並且只會在用戶查詢的位置上調用後處理器,因此希望您只需要一小部分Google財經信息。此外,對您的後處理器的調用將由多個用戶共享,以便您對Google財經發出最少量的查詢。