2013-02-05 31 views
2

在我們的ActivePivot解決方案中,我們編寫了一個後期處理器,根據股票價格(和波動率參數)計算股票期權的價格。當評估後處理器連接(現在)到Google Finance服務以即時檢索股票價格。因此,每次用戶在ActivePivot上進行查詢時,都會使用最新價格實時計算總量。使用外部數據更新ActivePivot連續查詢?

但我們還想在ActivePivot中利用連續查詢,並將實時更改推送給用戶的聚合(而不是週期性地點擊ActivePivot Live的刷新按鈕)。我們知道它通常是通過編寫一個連續的處理程序來實現的,它可以將價格變化事件傳播到ActivePivot,並讓ActivePivot計算對訂閱查詢的影響。但谷歌金融不提供推送API,如果我們通過定期輪詢數百種股票來敲定服務,我們將被禁止。

您在ActivePivot中推薦什麼機制來解決此問題?

回答

2

當後處理器的數據源是太不可預知的(像你的情況沒有推送能力就像一個遠程Web服務),以最好的方式是創建一個和這個處理器後處理器告訴ActivePivot該後處理器的整體結果每改變N秒。

您可以創建一個TickingStream,其中以固定的週期這樣的發送(空)事件:

package com.quartetfs.pivot.sandbox.postprocessor.impl; 

import java.util.Timer; 
import java.util.TimerTask; 

import com.quartetfs.biz.pivot.IActivePivotSession; 
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousQueryEngine; 
import com.quartetfs.biz.pivot.query.aggregates.impl.AStream; 
import com.quartetfs.fwk.QuartetExtendedPluginValue; 
import com.quartetfs.fwk.types.impl.ExtendedPluginInjector; 

/** 
* Stream sending an event at a regular rate. 
* 
*/ 
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IStream", key = TickingStream.PLUGIN_KEY) 
public class TickingStream extends AStream<Void> { 

    private static final long serialVersionUID = 1L; 

    public static final String PLUGIN_KEY = "TICKING"; 

    /** The default ticking period, in ms. **/ 
    protected static final long DEFAULT_PERIOD = 1000; 

    /** The ticking period, in ms. **/ 
    protected long period = DEFAULT_PERIOD; 

    /** The task responsible for sending the ticking events. */ 
    protected final TimerTask sendEventTask; 

    /** The timer that schedules the {@link #sendEventTask}. */ 
    protected Timer tickingTimer; 

    /** 
    * Create a ticking stream. 
    * 
    * @param engine 
    * @param session 
    */ 
    public TickingStream(IAggregatesContinuousQueryEngine engine, 
      IActivePivotSession session) { 
     super(engine, session); 

     // Create the task that will send the events. 
     sendEventTask = new TimerTask() { 

      @Override 
      public void run() { 
       sendEvent(null); 
      } 
     }; 
     // Schedule this task with the default period: 
     rescheduleTask(); 
    } 

    /** 
    * Schedule the {@link #sendEventTask} with the {@link #period} period. 
    * Removes also all previous scheduling of this task. 
    */ 
    protected void rescheduleTask() { 
     if (tickingTimer != null) { 
      tickingTimer.cancel(); 
     } 
     tickingTimer = new Timer(); 
     tickingTimer.schedule(sendEventTask, 0, period); 
    } 

    /** 
    * Change the ticking period of this stream. This will reschedule the task 
    * according to this new period. This setter will be called via 
    * {@link ExtendedPluginInjector extended plugin injection} 
    * 
    * @param period the period to set. Must be strictly positive. 
    * 
    * @throws IllegalArgumentException if period is smaller or equal to 0. 
    */ 
    public void setPeriod(long period) { 
     if (period <= 0) { 
      throw new IllegalArgumentException("Non-positive period."); 
     } 
     this.period = period; 
     rescheduleTask(); 
    } 

    /** {@inheritDoc} */ 
    @Override 
    public Class<Void> getEventType() { 
     return Void.class; 
    } 

    /** {@inheritDoc} */ 
    @Override 
    public String getType() { 
     return PLUGIN_KEY; 
    } 
} 

而且處理程序,請刷新與具有後處理器的所有連續查詢的一部分這個處理器在每個刻度:

package com.quartetfs.pivot.sandbox.postprocessor.impl; 

import java.util.Collections; 

import com.quartetfs.biz.pivot.IActivePivot; 
import com.quartetfs.biz.pivot.ILocation; 
import com.quartetfs.biz.pivot.query.aggregates.IImpact; 
import com.quartetfs.biz.pivot.query.aggregates.impl.AAggregatesContinuousHandler; 
import com.quartetfs.biz.pivot.query.aggregates.impl.Impact; 
import com.quartetfs.fwk.QuartetExtendedPluginValue; 

/** 
* The handler associated with a {@link TickingStream}. 
* 
* This handler asks for a full refresh of the locations queried on 
* post-processors with this handler each time it receives a tick. 
* <p> 
* This is the handler to use for post processors that have unpredictable data 
* sources which prevent the creation of a stream and a handler that can decide 
* which subset of the currently queried locations should be updated in a 
* continuous query. 
* 
*/ 
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.query.aggregates.IAggregatesContinuousHandler", key = TickingStream.PLUGIN_KEY) 
public class TickingHandler extends AAggregatesContinuousHandler<Void> { 

    private static final long serialVersionUID = 1L; 

    /** 
    * @param pivot 
    */ 
    public TickingHandler(IActivePivot pivot) { 
     super(pivot); 
    } 

    /** 
    * {@inheritDoc} 
    * <p> 
    * The impact on a queried location is the whole location since there is no 
    * way for us to know which part of the location should be updated or not. 
    */ 
    @Override 
    public IImpact computeImpact(ILocation location, Void event) { 
     return new Impact(location, Collections.singleton(location), Collections.singleton(location)); 
    } 

    /** {@inheritDoc} */ 
    @Override 
    public String getStreamKey() { 
     // This handler is made to be used with the TickingStream. 
     return TickingStream.PLUGIN_KEY; 
    } 

    /** {@inheritDoc} */ 
    @Override 
    public String getType() { 
     return TickingStream.PLUGIN_KEY; 
    } 
} 

而且您配置後處理器使用該處理程序是這樣的:

<measure name="..." folder="..." aggregationFunctions="..."> 
      <postProcessor pluginKey="yourPPpluginKey"> 
       <properties> 
        <entry key="continuousQueryHandlerKeys" value="TICKING" /> 
       </properties> 
      </postProcessor> 
     </measure> 

從某種意義上說,您將會投票Google Finance服務,因爲您的後處理器將每秒被調用一次(默認期)。但是,只有當用戶使用後處理器進行連續查詢時纔會發生這種情況,並且只會在用戶查詢的位置上調用後處理器,因此希望您只需要一小部分Google財經信息。此外,對您的後處理器的調用將由多個用戶共享,以便您對Google財經發出最少量的查詢。