如果你正在尋找的東西的是「功能更強大」,那麼這可能會有幫助,但是通過@Evk答案通過這些測試過。
首先,我可以建議,從提供可驗證的答案中解脫出來,能否提供一個測試套件來實現這樣的複雜問題。
這樣的事情會非常有幫助。
var scheduler = new TestScheduler();
var source = scheduler.CreateColdObservable<byte>(
ReactiveTest.OnNext<byte>(01,0x81), //BOM m1
ReactiveTest.OnNext<byte>(02,0x01),
ReactiveTest.OnNext<byte>(03,0x02),
ReactiveTest.OnNext<byte>(04,0x03),
ReactiveTest.OnNext<byte>(05,0x82), //EOM m1
ReactiveTest.OnNext<byte>(06,0x81), //BOM m2
ReactiveTest.OnNext<byte>(07,0x82),
ReactiveTest.OnNext<byte>(08,0x81),
ReactiveTest.OnNext<byte>(09,0x82),
ReactiveTest.OnNext<byte>(10,0x82), //EOM m2
ReactiveTest.OnNext<byte>(11,0x81), //BOM m3
ReactiveTest.OnNext<byte>(12,0x01),
ReactiveTest.OnNext<byte>(13,0x02),
ReactiveTest.OnNext<byte>(14,0x1B),
ReactiveTest.OnNext<byte>(15,0x82), //EOM m3
ReactiveTest.OnNext<byte>(16,0x81), //BOM m4
ReactiveTest.OnNext<byte>(17,0x01),
ReactiveTest.OnNext<byte>(18,0x02),
ReactiveTest.OnNext<byte>(19,0x03),
ReactiveTest.OnNext<byte>(20,0x1B), //Control character
ReactiveTest.OnNext<byte>(21,0x82), //Data
ReactiveTest.OnNext<byte>(22,0x82), //EOM m4
ReactiveTest.OnNext<byte>(23,0x81), //BOM m5
ReactiveTest.OnNext<byte>(24,0x01),
ReactiveTest.OnNext<byte>(25,0x02),
ReactiveTest.OnNext<byte>(26,0x03),
ReactiveTest.OnNext<byte>(27,0x1B), //Control character
ReactiveTest.OnNext<byte>(28,0x1B), //Data
ReactiveTest.OnNext<byte>(29,0x82), //EOM m5
ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81)
var observer = scheduler.CreateObserver<Message>();
//CurrentAnswer(source)
MyAnswer(source)
.Subscribe(observer);
scheduler.Start();
ReactiveAssert.AreElementsEqual(
new[] {
ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}),
ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}),
ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}),
ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}),
ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}),
},
observer.Messages);
我也寫了一個版本的Message
,讓我驗證碼
public class Message
{
public static readonly byte BOM = 0x81;
public static readonly byte EOM = 0x82;
public static readonly byte Control = 0x1B;
public byte[] Header { get; set; }
public byte[] Data { get; set; }
public static Message Create(byte[] bytes)
{
if(bytes==null)
throw new ArgumentNullException(nameof(bytes));
if(bytes.Length<3)
throw new ArgumentException("bytes<3").Dump();
var header = new byte[3];
Array.Copy(bytes, header, 3);
var body = new List<byte>();
var escapeNext = false;
for (int i = 3; i < bytes.Length; i++)
{
var b = bytes[i];
if (b == Control && !escapeNext)
{
escapeNext = true;
}
else
{
body.Add(b);
escapeNext = false;
}
}
var msg = new Message { Header = header, Data = body.ToArray()};
return msg;
}
public override string ToString()
{
return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data));
}
private static string ByteArrayString(byte[] bytes)
{
return string.Join(",", bytes.Select(b => b.ToString("X")));
}
public override bool Equals(object obj)
{
var other = obj as Message;
if(obj==null)
return false;
return Equals(other);
}
protected bool Equals(Message other)
{
return IsSequenceEqual(Header, other.Header)
&& IsSequenceEqual(Data, other.Data);
}
private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other)
{
if(expected==null && other==null)
return true;
if(expected==null || other==null)
return false;
return expected.SequenceEqual(other);
}
public override int GetHashCode()
{
unchecked
{
return ((Header != null ? Header.GetHashCode() : 0) * 397)^(Data != null ? Data.GetHashCode() : 0);
}
}
}
現在,我有所有的管道,我可以專注於企業的實際問題。
public static IObservable<Message> MyAnswer(IObservable<byte> source)
{
return source.Publish(s =>
{
return
Observable.Defer(()=>
//Start consuming once we see a BOM
s.SkipWhile(b => b != Message.BOM)
.Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur))
)
.TakeWhile(acc=>!acc.IsEndOfMessage())
.Where(acc=>!acc.IsBeginingOfMessage())
.Select(acc=>acc.Value())
.ToArray()
.Where(buffer=>buffer.Any())
.Select(buffer => Message.Create(buffer))
.Repeat();
});
}
public class Accumulator
{
private int _index = 0;
private byte _current =0;
private bool _isCurrentEscaped = false;
private bool _isNextEscaped = false;
public Accumulator Accumulate(byte b)
{
_index++;
_current = b;
_isCurrentEscaped = _isNextEscaped;
_isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control);
return this;
}
public byte Value()
{
return _current;
}
private bool IsHeader()
{
return _index < 5;
}
public bool IsBeginingOfMessage()
{
return _index == 1 && _current == Message.BOM;
}
public bool IsEndOfMessage()
{
return !IsHeader()
&& _current == Message.EOM
&& !_isCurrentEscaped;
}
}
爲了完整起見,這裏是@ EVK的答案的膽量,所以你很容易地交換和退出的實現。
public static IObservable<Message> CurrentAnswer(IObservable<byte> source)
{
return Observable.Create<Message>(o =>
{
// some crude parsing code for the sake of example
bool nextIsEscaped = false;
bool readingHeader = false;
bool readingBody = false;
List<byte> body = new List<byte>();
List<byte> header = new List<byte>();
return source.Subscribe(b =>
{
if (b == 0x81 && !nextIsEscaped && !readingHeader)
{
// start
readingHeader = true;
readingBody = false;
nextIsEscaped = false;
}
else if (b == 0x82 && !nextIsEscaped && !readingHeader)
{
// end
readingHeader = false;
readingBody = false;
if (header.Count > 0 || body.Count > 0)
{
o.OnNext(new Message()
{
Header = header.ToArray(),
Data = body.ToArray()
});
header.Clear();
body.Clear();
}
nextIsEscaped = false;
}
else if (b == 0x1B && !nextIsEscaped && !readingHeader)
{
nextIsEscaped = true;
}
else
{
if (readingHeader)
{
header.Add(b);
if (header.Count == 3)
{
readingHeader = false;
readingBody = true;
}
}
else if (readingBody)
body.Add(b);
nextIsEscaped = false;
}
});
});
}
感謝您的精心解答! 您在聲明這實質上是一臺狀態機時是正確的。我甚至在我的問題中添加了一個圖表。 我理解你的解決方案,但我希望有一種更直觀的方式來描述Rx中的狀態機。 –
看起來像你的狀態機的Stateless這樣的東西會非常適合(替換我的Accumulator類)。那麼你只需要在它周圍放一些Rx。我能夠在7個Rx操作員和一個狀態機上玩耍並獲得一個很好的解決方案。可測試,乾淨,封裝。不知道你還在追尋什麼。 –
我只希望能有一種方式來表達狀態機的功能。我也想如果主管道可以寫成'stream.SplitOnMessageBoundaries()。Select(UnescapeMessageBuffer).Select(CreateMessage);' –