2016-10-04 35 views
0

我有一個通過串口與計算機通信的設備。發送一個「START」命令後,設備迴應一個確認,並開始監控一些外部活動。然後它根據外部活動異步傳輸串行端口上的一些消息。當設備收到「停止」命令時,它會迴應確認,然後停止發送更多消息(表示外部活動)。Rx.Net:處理訂閱時執行異步副作用

我已經實現了帶有冷觀察功能的啓動/停止命令,這些命令執行副作用(串口發送命令),並且如果在串口上收到一個ACKckowledge,則會發出一個單一的Unit.Default。我想構建一個IObservable,它發出與外部活動相關的消息,並在訂閱時執行「START」副作用,並且在訂閱處理時執行「STOP」副作用。 「START」很簡單,我只需要做一個「SelectMany」,但我不知道如何執行「STOP」。

class MonitoringDevice 
{ 
    private SerialPort _sp; 
    private IObservable<byte> _receivedBytes; 

    public IObservable<ExternalActivity> ActivityStream { get; } 

    public MonitoringDevice() 
    { 
     _sp = new SerialPort("COM1"); 
     _receivedBytes = Observable 
         .FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
          h => 
          { 
           _sp.DiscardInBuffer(); 
           _sp.DataReceived += h; 
          }, 
          h => 
          { 
           _sp.DataReceived -= h; 
          }) 
         .SelectMany(x => 
         { 

          byte[] buffer = new byte[1024]; 
          var ret = new List<byte>(); 
          int bytesRead = 0; 
          do 
          { 
           bytesRead = _sp.Read(buffer, 0, buffer.Length); 
           ret.AddRange(buffer.Take(bytesRead)); 
          } while ((bytesRead >= buffer.Length)); 
          return ret; 

         }) 
         .Publish() 
         .RefCount(); 


     ActivityStream = StartMonitoringAsync() 
         .SelectMany(_receivedBytes.ToActivity()); 
         // we need to execute StopMonitoringAsync 
         // when a subscription to ActivityStream is disposed 

     _sp.Open(); 
    } 



    private IObservable<Unit> StartMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("START"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


    private IObservable<Unit> StopMonitoringAsync() 
    { 
     return Observable 
       .Create<Unit>(
       obs => 
       { 
        _sp.Write("STOP"); 
        return _receivedBytes 
          .ToAcknowledge() 
          .FirstAsync() 
          .Timeout(TimeSpan.FromMilliseconds(1000)) 
          .Subscribe(obs); 
       }); 
    } 


} 

ExternalActivity只是一個POCO。

ToAcknowledge是一種擴展方法,返回IObservable,當設備傳輸確認時發出Unit.Default。 - 這是按預期工作的;

ToActivity是一種擴展方法,返回IObservable,它解析輸入的串行數據併發出ExternalActivity對象。 - 這是按預期工作的;


編輯:增加了對ToAcknowledgeToActivity擴展方法實現。

public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source) 
    { 
     return source.Buffer(3, 1) 
       .Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK 
       .Select(x => Unit.Default); 

    } 

    public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source) 
    { 
     return source 
       .Publish(ps => ps.Buffer(ps.Where(x => x == 1),    // SOH 
              bo => ps.Where(x => x == 4))) // EOT 
       .Select(bfr => bfr.Take(bfr.Count - 1).Skip(1)) 
       .Where(bfr => bfr.Count() == 12) 
       .Select(bfr => 
       { 
        var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0); 
        var id = BitConverter.ToInt32(bfr.ToArray(), 8); 
        return new ExternalActivity(timestamp, id); 
       });   
    } 
+0

請問您可以顯示'ToAcknowledge'和'ToActivity'的代碼嗎?沒有他們,我無法給你一個答案。 – Enigmativity

+0

@Enigmativity - 看我編輯的問題。 – francezu13k50

+1

嗯,我不認爲在這種情況下,停止和處置應該被合併到相同的事情。我認爲你想把Stop模型化爲一個命令。並因此認爲它可以成功,失敗,超時等。處置就是純粹的,斷開和摧毀訂閱。 –

回答

0

您可以通過修改StartAsync是這樣的:

private IObservable<Unit> StartAsync(Action unsubscribe) 
{ 
    return 
     Observable 
      .Create<Unit>(o => 
      { 
       var subscription = 
        Observable 
         .Timer(TimeSpan.FromSeconds(1)) 
         .Select(_=> Unit.Default) 
         .Subscribe(o); 
       return new CompositeDisposable(
        subscription, 
        Disposable.Create(unsubscribe)); 
      });; 
} 

然後你就可以在你喜歡的任何Action unsubscribe注入。與此代碼

嘗試測試:

var subscription = 
    StartAsync(() => Console.WriteLine("Done")) 
    .Subscribe(); 

Thread.Sleep(3000); 

subscription.Dispose(); 

你會看到「完成」後3秒寫入控制檯。

+0

謝謝。訂閱處理所採取的行動是異步行爲。它是作爲一個冷觀察實現的,它執行副作用(通過串行端口發送命令)並在響應到達時發出單個值。在我的代碼示例中,它是用'Observable.Timer'模擬的。我需要等待主要訂閱處置中的這個可觀察的內容。 – francezu13k50

+0

@ francezu13k50 - 你不能在'Action'中放置一個observable嗎?但是,依靠計時器來完成異步操作並不是一個好主意。 Rx非常擅長處理這些情況。你能顯示你真實的代碼嗎? – Enigmativity

+0

我不知道如何在'Action'中放置一個observable。 「Observable.Timer」僅用於模擬器件的異步響應,實際上是一個「STOP」命令;我並不是依靠計時器來完成操作,它實際上是確認操作成功的設備。 – francezu13k50