2014-11-03 32 views
0

新的Rx;試圖弄清楚。消費流,轉型,然後交給其他消費者(沒有州)的方法

我正在通過事件從加速度計獲取數據,將數據調整爲我自己的格式,然後通過反應式擴展向其他消費者提供數據流。

我的方法部分是在調用一個重要的構造函數。有人可以看看這個,並幫助我理解我應該做什麼不同嗎?

希望,這段代碼就足夠了:

public class accel_raw_producer : IObservable<AccelerometerFrame_raw> 
{ 
    private static Spatial spatial = null; 
    private IObservable<EventPattern<SpatialDataEventArgs>> spatialEvents; 

    public accel_raw_producer() 
    { 
     spatial = new Spatial(); 
     spatial.close(); 
     spatialEvents = System.Reactive.Linq.Observable.FromEventPattern 
     <SpatialDataEventHandler, SpatialDataEventArgs>(
     handler => handler.Invoke, 
     h => spatial.SpatialData += h, 
     h => spatial.SpatialData -= h); 
     subscription = spatialEvents.Subscribe(); 
     spatial.open(-1); 
    } 


    public IDisposable Subscribe(IObserver<AccelerometerFrame_raw> observer) 
    { 
     return 
     (from evt in spatialEvents 
     let e = evt.EventArgs 
     select new AccelerometerFrame_raw 
     (
      e.spatialData[0].Acceleration[0], 
      e.spatialData[0].Acceleration[1], 
      e.spatialData[0].Acceleration[2], 
      e.spatialData[0].AngularRate[0], 
      e.spatialData[0].AngularRate[1], 
      e.spatialData[0].AngularRate[2] 
     )).Subscribe(); 
    } 
} 

public consumerClass :IObserver<AccelerometerFrame_raw> 
{ 
    accel_raw_producer accelStream; 
    IDisposable Unsubscriber; 

    public consumerClass() 
    { 
     accelStream = new accel_raw_producer(); 
     Unsubscriber = accelStream.Subscribe(this); 
    } 

    public void OnCompleted() 
    { 
     throw new NotImplementedException(); 
    } 

    public void OnError(Exception error) 
    { 
     throw new NotImplementedException(); 
    } 

    public void OnNext(AccelerometerFrame_raw accelFrame) 
    { 
     if (null == accelFrame) return; 
     AccelX = accelFrame.Acceleration.X; 
     AccelY = accelFrame.Acceleration.Y; 
     AccelZ = accelFrame.Acceleration.Z; 
     GyroX = accelFrame.Rotation.X; 
     GyroY = accelFrame.Rotation.Y; 
     GyroZ = accelFrame.Rotation.Z; 
    }  
} 

當我設置在AccelerometerFrame_raw斷點,它就會被觸發,但值不會傳播到消費者。所以這種方法中的某些東西需要有所不同。

回答

3

我不確定這裏是否遺漏了一些細節,但是您似乎沒有使用觀察者輸入參數,而訂閱也沒有對onNext值做任何事情。就個人而言,我會考慮這個擺動周圍,只是傳遞出的IObservable,讓來電者訂閱它自己:

public IObservable<AccelerometerFrame_raw> AccelerometerFrames() 
{ 
    return 
     from evt in spatialEvents 
     let e = evt.EventArgs 
     select new AccelerometerFrame_raw 
     (
      e.spatialData[0].Acceleration[0], 
      e.spatialData[0].Acceleration[1], 
      e.spatialData[0].Acceleration[2], 
      e.spatialData[0].AngularRate[0], 
      e.spatialData[0].AngularRate[1], 
      e.spatialData[0].AngularRate[2] 
     ); 
} 
+0

當我嘗試,我得到一個編譯錯誤:無法隱式轉換類型「System.IObservable '到'System.IDisposable'。存在明確的轉換(你是否缺少一個轉換嗎?)這就是讓我嘗試了幾個例如.AsObservable()和.Subscribe()以查看它們是否工作的原因。另請注意,我在原始帖子中添加了更多代碼,以便您可以看到生產者類和消費者類之間的關係。 OnNext在消費者階層中。 – philologon 2014-11-04 02:41:00

+0

好的,我最近的評論有些問題。我沒有意識到你已經改變了方法的返回類型。現在我明白了,這讓我懷疑,如果我這樣做了,那麼我在Subscribe方法中放什麼? – philologon 2014-11-04 02:50:33

+1

什麼都沒有 - 你不要自己動手。您返回observable,以便調用者可以自己調用'Subscribe(onNext)'。 – 2014-11-04 05:06:34

相關問題