2013-02-05 31 views

在我們的ActivePivot解決方案中,我們編寫了一個後期處理器,根據股票價格(和波動率參數)計算股票期權的價格。當評估後處理器連接(現在)到Google Finance服務以即時檢索股票價格。因此,每次用戶在ActivePivot上進行查詢時,都會使用最新價格實時計算總量。使用外部數據更新ActivePivot連續查詢?

但我們還想在ActivePivot中利用連續查詢,並將實時更改推送給用戶的聚合(而不是週期性地點擊ActivePivot Live的刷新按鈕)。我們知道它通常是通過編寫一個連續的處理程序來實現的,它可以將價格變化事件傳播到ActivePivot,並讓ActivePivot計算對訂閱查詢的影響。但谷歌金融不提供推送API,如果我們通過定期輪詢數百種股票來敲定服務,我們將被禁止。






package com.quartetfs.pivot.sandbox.postprocessor.impl; 

import java.util.Timer; 
import java.util.TimerTask; 

import com.quartetfs.biz.pivot.IActivePivotSession; 
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine; 
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream; 
import com.quartetfs.fwk.QuartetExtendedPluginValue; 
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector; 

* Stream sending an event at a regular rate. 
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY) 
public class TickingStream extends AStream<Void> { 

    private static final long serialVersionUID = 1L; 

    public static final String PLUGIN_KEY = "TICKING"; 

    /** The default ticking period, in ms. **/ 
    protected static final long DEFAULT_PERIOD = 1000; 

    /** The ticking period, in ms. **/ 
    protected long period = DEFAULT_PERIOD; 

    /** The task responsible for sending the ticking events. */ 
    protected final TimerTask sendEventTask; 

    /** The timer that schedules the {@link #sendEventTask}. */ 
    protected Timer tickingTimer; 

    * Create a ticking stream. 
    * @param engine 
    * @param session 
    public TickingStream(IAggregatesContinuousQueryEngine engine, 
      IActivePivotSession session) { 
     super(engine, session); 

     // Create the task that will send the events. 
     sendEventTask = new TimerTask() { 

      public void run() { 
     // Schedule this task with the default period: 

    * Schedule the {@link #sendEventTask} with the {@link #period} period. 
    * Removes also all previous scheduling of this task. 
    protected void rescheduleTask() { 
     if (tickingTimer != null) { 
     tickingTimer = new Timer(); 
     tickingTimer.schedule(sendEventTask, 0, period); 

    * Change the ticking period of this stream. This will reschedule the task 
    * according to this new period. This setter will be called via 
    * {@link ExtendedPluginInjector extended plugin injection} 
    * @param period the period to set. Must be strictly positive. 
    * @throws IllegalArgumentException if period is smaller or equal to 0. 
    public void setPeriod(long period) { 
     if (period <= 0) { 
      throw new IllegalArgumentException("Non-positive period."); 
     this.period = period; 

    /** {@inheritDoc} */ 
    public Class<Void> getEventType() { 
     return Void.class; 

    /** {@inheritDoc} */ 
    public String getType() { 
     return PLUGIN_KEY; 


package com.quartetfs.pivot.sandbox.postprocessor.impl; 

import java.util.Collections; 

import com.quartetfs.biz.pivot.IActivePivot; 
import com.quartetfs.biz.pivot.ILocation; 
import com.quartetfs.biz.pivot.query.aggregates.IImpact; 
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler; 
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact; 
import com.quartetfs.fwk.QuartetExtendedPluginValue; 

* The handler associated with a {@link TickingStream}. 
* This handler asks for a full refresh of the locations queried on 
* post-processors with this handler each time it receives a tick. 
* <p> 
* This is the handler to use for post processors that have unpredictable data 
* sources which prevent the creation of a stream and a handler that can decide 
* which subset of the currently queried locations should be updated in a 
* continuous query. 
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY) 
public class TickingHandler extends AAggregatesContinuousHandler<Void> { 

    private static final long serialVersionUID = 1L; 

    * @param pivot 
    public TickingHandler(IActivePivot pivot) { 

    * {@inheritDoc} 
    * <p> 
    * The impact on a queried location is the whole location since there is no 
    * way for us to know which part of the location should be updated or not. 
    public IImpact computeImpact(ILocation location, Void event) { 
     return new Impact(location, Collections.singleton(location), Collections.singleton(location)); 

    /** {@inheritDoc} */ 
    public String getStreamKey() { 
     // This handler is made to be used with the TickingStream. 
     return TickingStream.PLUGIN_KEY; 

    /** {@inheritDoc} */ 
    public String getType() { 
     return TickingStream.PLUGIN_KEY; 


<measure name="..." folder="..." aggregationFunctions="..."> 
      <postProcessor pluginKey="yourPPpluginKey"> 
        <entry key="continuousQueryHandlerKeys" value="TICKING" /> 

從某種意義上說,您將會投票Google Finance服務,因爲您的後處理器將每秒被調用一次(默認期)。但是,只有當用戶使用後處理器進行連續查詢時纔會發生這種情況,並且只會在用戶查詢的位置上調用後處理器,因此希望您只需要一小部分Google財經信息。此外,對您的後處理器的調用將由多個用戶共享,以便您對Google財經發出最少量的查詢。