我有一個通過串口與計算機通信的設備。發送一個「START」命令後,設備迴應一個確認,並開始監控一些外部活動。然後它根據外部活動異步傳輸串行端口上的一些消息。當設備收到「停止」命令時,它會迴應確認,然後停止發送更多消息(表示外部活動)。Rx.Net:處理訂閱時執行異步副作用
我已經實現了帶有冷觀察功能的啓動/停止命令,這些命令執行副作用(串口發送命令),並且如果在串口上收到一個ACKckowledge,則會發出一個單一的Unit.Default
。我想構建一個IObservable
,它發出與外部活動相關的消息,並在訂閱時執行「START」副作用,並且在訂閱處理時執行「STOP」副作用。 「START」很簡單,我只需要做一個「SelectMany」,但我不知道如何執行「STOP」。
class MonitoringDevice
{
private SerialPort _sp;
private IObservable<byte> _receivedBytes;
public IObservable<ExternalActivity> ActivityStream { get; }
public MonitoringDevice()
{
_sp = new SerialPort("COM1");
_receivedBytes = Observable
.FromEventPattern<SerialDataReceivedEventHandler, SerialDataReceivedEventArgs>(
h =>
{
_sp.DiscardInBuffer();
_sp.DataReceived += h;
},
h =>
{
_sp.DataReceived -= h;
})
.SelectMany(x =>
{
byte[] buffer = new byte[1024];
var ret = new List<byte>();
int bytesRead = 0;
do
{
bytesRead = _sp.Read(buffer, 0, buffer.Length);
ret.AddRange(buffer.Take(bytesRead));
} while ((bytesRead >= buffer.Length));
return ret;
})
.Publish()
.RefCount();
ActivityStream = StartMonitoringAsync()
.SelectMany(_receivedBytes.ToActivity());
// we need to execute StopMonitoringAsync
// when a subscription to ActivityStream is disposed
_sp.Open();
}
private IObservable<Unit> StartMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("START");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
private IObservable<Unit> StopMonitoringAsync()
{
return Observable
.Create<Unit>(
obs =>
{
_sp.Write("STOP");
return _receivedBytes
.ToAcknowledge()
.FirstAsync()
.Timeout(TimeSpan.FromMilliseconds(1000))
.Subscribe(obs);
});
}
}
ExternalActivity
只是一個POCO。
ToAcknowledge
是一種擴展方法,返回IObservable
,當設備傳輸確認時發出Unit.Default
。 - 這是按預期工作的;
ToActivity
是一種擴展方法,返回IObservable
,它解析輸入的串行數據併發出ExternalActivity
對象。 - 這是按預期工作的;
編輯:增加了對ToAcknowledge
和ToActivity
擴展方法實現。
public static IObservable<Unit> ToAcknowledge(this IObservable<byte> source)
{
return source.Buffer(3, 1)
.Where(bfr => bfr.SequenceEqual(new byte[] { 65, 67, 75 })) // ACK
.Select(x => Unit.Default);
}
public static IObservable<ExternalActivity> ToActivity(this IObservable<byte> source)
{
return source
.Publish(ps => ps.Buffer(ps.Where(x => x == 1), // SOH
bo => ps.Where(x => x == 4))) // EOT
.Select(bfr => bfr.Take(bfr.Count - 1).Skip(1))
.Where(bfr => bfr.Count() == 12)
.Select(bfr =>
{
var timestamp = BitConverter.ToInt64(bfr.Take(8).ToArray(), 0);
var id = BitConverter.ToInt32(bfr.ToArray(), 8);
return new ExternalActivity(timestamp, id);
});
}
請問您可以顯示'ToAcknowledge'和'ToActivity'的代碼嗎?沒有他們,我無法給你一個答案。 – Enigmativity
@Enigmativity - 看我編輯的問題。 – francezu13k50
嗯,我不認爲在這種情況下,停止和處置應該被合併到相同的事情。我認爲你想把Stop模型化爲一個命令。並因此認爲它可以成功,失敗,超時等。處置就是純粹的,斷開和摧毀訂閱。 –