2016-06-07 108 views
7

我試圖解析表示消息的傳入字節流。 我需要拆分流併爲每個部分創建一個消息結構。Rx.Net消息解析器

消息始終以0x81(BOM)開始,以0x82(EOM)結尾。

start: 0x81 
header: 3 bytes 
data: arbitrary length 
stop: 0x82 

數據部分使用轉義字節0x1B(ESC)轉義:每當在數據部分中的一個字節包含控制字節{ESC,BOM,EOM}中的一個,它帶有前綴ESC。

標題部分未轉義,並可能包含控制字節。

我想使用Rx.Net以功能反應式的方式對此進行編碼,方法是使用IObservable<byte>並將其轉換爲IObservable<Message>

什麼是最習慣的方式來做到這一點?

一些例子:

[81 01 02 03 82] single message 
[81 82 81 82 82] single message, header = [82 81 82] 
[81 01 02 1B 82] single message, header = [01 02 1B]. 
[81 01 02 03 1B 82 82] single message, header = [01 02 03], (unescaped) data = [82] 
[81 01 02 03 1B 1B 82 82] single message + dangling [82] which should be ignored. 
          header = [01 02 03], (unescaped) data = [1B] 

這裏有一個狀態機汲取這個: enter image description here

回答

2

如果你正在尋找的東西的是「功能更強大」,那麼這可能會有幫助,但是通過@Evk答案通過這些測試過。

首先,我可以建議,從提供可驗證的答案中解脫出來,能否提供一個測試套件來實現這樣的複雜問題。

這樣的事情會非常有幫助。

var scheduler = new TestScheduler(); 
var source = scheduler.CreateColdObservable<byte>(
    ReactiveTest.OnNext<byte>(01,0x81), //BOM m1 
    ReactiveTest.OnNext<byte>(02,0x01), 
    ReactiveTest.OnNext<byte>(03,0x02), 
    ReactiveTest.OnNext<byte>(04,0x03), 
    ReactiveTest.OnNext<byte>(05,0x82), //EOM m1 
    ReactiveTest.OnNext<byte>(06,0x81), //BOM m2 
    ReactiveTest.OnNext<byte>(07,0x82), 
    ReactiveTest.OnNext<byte>(08,0x81), 
    ReactiveTest.OnNext<byte>(09,0x82), 
    ReactiveTest.OnNext<byte>(10,0x82), //EOM m2 
    ReactiveTest.OnNext<byte>(11,0x81), //BOM m3 
    ReactiveTest.OnNext<byte>(12,0x01),  
    ReactiveTest.OnNext<byte>(13,0x02), 
    ReactiveTest.OnNext<byte>(14,0x1B), 
    ReactiveTest.OnNext<byte>(15,0x82), //EOM m3 
    ReactiveTest.OnNext<byte>(16,0x81), //BOM m4 
    ReactiveTest.OnNext<byte>(17,0x01), 
    ReactiveTest.OnNext<byte>(18,0x02), 
    ReactiveTest.OnNext<byte>(19,0x03), 
    ReactiveTest.OnNext<byte>(20,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(21,0x82), //Data 
    ReactiveTest.OnNext<byte>(22,0x82), //EOM m4 
    ReactiveTest.OnNext<byte>(23,0x81), //BOM m5 
    ReactiveTest.OnNext<byte>(24,0x01), 
    ReactiveTest.OnNext<byte>(25,0x02), 
    ReactiveTest.OnNext<byte>(26,0x03), 
    ReactiveTest.OnNext<byte>(27,0x1B), //Control character 
    ReactiveTest.OnNext<byte>(28,0x1B), //Data 
    ReactiveTest.OnNext<byte>(29,0x82), //EOM m5 
    ReactiveTest.OnNext<byte>(30,0x82));//Ignored (expected 0x81) 

var observer = scheduler.CreateObserver<Message>(); 

//CurrentAnswer(source) 
MyAnswer(source) 
    .Subscribe(observer); 

scheduler.Start(); 

ReactiveAssert.AreElementsEqual(
    new[] { 
     ReactiveTest.OnNext(05, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(10, new Message(){Header=new byte[]{0x82, 0x81, 0x82}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(15, new Message(){Header=new byte[]{0x01, 0x02, 0x1B}, Data=new byte[0]{}}), 
     ReactiveTest.OnNext(22, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x82}}), 
     ReactiveTest.OnNext(29, new Message(){Header=new byte[]{0x01, 0x02, 0x03}, Data=new byte[]{ 0x1B}}), 
    },             
    observer.Messages); 

我也寫了一個版本的Message,讓我驗證碼

public class Message 
{ 
    public static readonly byte BOM = 0x81; 
    public static readonly byte EOM = 0x82; 
    public static readonly byte Control = 0x1B; 

    public byte[] Header { get; set; } 
    public byte[] Data { get; set; } 

    public static Message Create(byte[] bytes) 
    { 
     if(bytes==null) 
      throw new ArgumentNullException(nameof(bytes)); 
     if(bytes.Length<3) 
      throw new ArgumentException("bytes<3").Dump(); 


     var header = new byte[3]; 
     Array.Copy(bytes, header, 3); 

     var body = new List<byte>(); 
     var escapeNext = false; 
     for (int i = 3; i < bytes.Length; i++) 
     { 
      var b = bytes[i]; 

      if (b == Control && !escapeNext) 
      { 
       escapeNext = true; 
      } 
      else 
      { 
       body.Add(b); 
       escapeNext = false; 
      } 
     } 
     var msg = new Message { Header = header, Data = body.ToArray()}; 
     return msg; 
    } 

    public override string ToString() 
    { 
     return string.Format("Message(Header=[{0}], Data=[{1}])", ByteArrayString(Header), ByteArrayString(Data)); 
    } 

    private static string ByteArrayString(byte[] bytes) 
    { 
     return string.Join(",", bytes.Select(b => b.ToString("X"))); 
    } 

    public override bool Equals(object obj) 
    { 
     var other = obj as Message; 
     if(obj==null) 
      return false; 
     return Equals(other); 
    } 

    protected bool Equals(Message other) 
    { 
     return IsSequenceEqual(Header, other.Header) 
      && IsSequenceEqual(Data, other.Data); 
    } 

    private bool IsSequenceEqual<T>(IEnumerable<T> expected, IEnumerable<T> other) 
    { 
     if(expected==null && other==null) 
      return true; 
     if(expected==null || other==null) 
      return false; 
     return expected.SequenceEqual(other); 
    } 

    public override int GetHashCode() 
    { 
     unchecked 
     { 
      return ((Header != null ? Header.GetHashCode() : 0) * 397)^(Data != null ? Data.GetHashCode() : 0); 
     } 
    } 
} 

現在,我有所有的管道,我可以專注於企業的實際問題。

public static IObservable<Message> MyAnswer(IObservable<byte> source) 
{ 
    return source.Publish(s => 
     { 

      return 
       Observable.Defer(()=> 
        //Start consuming once we see a BOM 
        s.SkipWhile(b => b != Message.BOM) 
        .Scan(new Accumulator(), (acc, cur)=>acc.Accumulate(cur)) 
       ) 
       .TakeWhile(acc=>!acc.IsEndOfMessage()) 
       .Where(acc=>!acc.IsBeginingOfMessage()) 
       .Select(acc=>acc.Value()) 
       .ToArray() 
       .Where(buffer=>buffer.Any()) 
       .Select(buffer => Message.Create(buffer)) 
       .Repeat(); 
     }); 

} 
public class Accumulator 
{ 
    private int _index = 0; 
    private byte _current =0; 
    private bool _isCurrentEscaped = false; 
    private bool _isNextEscaped = false; 

    public Accumulator Accumulate(byte b) 
    { 
     _index++; 
     _current = b; 
     _isCurrentEscaped = _isNextEscaped; 
     _isNextEscaped = (!IsHeader() && !_isCurrentEscaped && b==Message.Control); 
     return this; 
    } 
    public byte Value() 
    { 
     return _current; 
    } 

    private bool IsHeader() 
    { 
     return _index < 5; 
    } 
    public bool IsBeginingOfMessage() 
    { 
     return _index == 1 && _current == Message.BOM; 
    } 
    public bool IsEndOfMessage() 
    { 
     return !IsHeader() 
      && _current == Message.EOM 
      && !_isCurrentEscaped; 
    } 
} 

爲了完整起見,這裏是@ EVK的答案的膽量,所以你很容易地交換和退出的實現。

public static IObservable<Message> CurrentAnswer(IObservable<byte> source) 
{ 
    return Observable.Create<Message>(o => 
    { 
     // some crude parsing code for the sake of example 
     bool nextIsEscaped = false; 
     bool readingHeader = false; 
     bool readingBody = false; 
     List<byte> body = new List<byte>(); 
     List<byte> header = new List<byte>(); 
     return source.Subscribe(b => 
     { 
      if (b == 0x81 && !nextIsEscaped && !readingHeader) 
      { 
       // start 
       readingHeader = true; 
       readingBody = false; 
       nextIsEscaped = false; 
      } 
      else if (b == 0x82 && !nextIsEscaped && !readingHeader) 
      { 
       // end 
       readingHeader = false; 
       readingBody = false; 
       if (header.Count > 0 || body.Count > 0) 
       { 
        o.OnNext(new Message() 
        { 
         Header = header.ToArray(), 
         Data = body.ToArray() 
        }); 
        header.Clear(); 
        body.Clear(); 
       } 
       nextIsEscaped = false; 
      } 
      else if (b == 0x1B && !nextIsEscaped && !readingHeader) 
      { 
       nextIsEscaped = true; 
      } 
      else 
      { 
       if (readingHeader) 
       { 
        header.Add(b); 
        if (header.Count == 3) 
        { 
         readingHeader = false; 
         readingBody = true; 
        } 
       } 
       else if (readingBody) 
        body.Add(b); 
       nextIsEscaped = false; 
      } 

     }); 
    }); 

} 
+0

感謝您的精心解答! 您在聲明這實質上是一臺狀態機時是正確的。我甚至在我的問題中添加了一個圖表。 我理解你的解決方案,但我希望有一種更直觀的方式來描述Rx中的狀態機。 –

+1

看起來像你的狀態機的Stateless這樣的東西會非常適合(替換我的Accumulator類)。那麼你只需要在它周圍放一些Rx。我能夠在7個Rx操作員和一個狀態機上玩耍並獲得一個很好的解決方案。可測試,乾淨,封裝。不知道你還在追尋什麼。 –

+0

我只希望能有一種方式來表達狀態機的功能。我也想如果主管道可以寫成'stream.SplitOnMessageBoundaries()。Select(UnescapeMessageBuffer).Select(CreateMessage);' –

2

你可以只使用基本的構建模塊:Observable.CreateSubscribe。首先,讓我們抓住一些代碼,這將有助於我們讀流中觀察到的byte []的(也有該怎麼做許多不同的例子):

static class Extensions { 
    public static IObservable<byte[]> AsyncRead(this Stream stream, int bufferSize) {   
     var buffer = new byte[bufferSize];    
     var asyncRead = Observable.FromAsyncPattern<byte[], int, int, int>(
      stream.BeginRead, 
      stream.EndRead); 
     return Observable.While(
      () => stream.CanRead, 
      Observable.Defer(() => asyncRead(buffer, 0, bufferSize)) 
       .Select(readBytes => buffer.Take(readBytes).ToArray())); 
    } 
} 

然後定義消息類:

class Message { 
    public byte[] Header { get; set; } 
    public byte[] Body { get; set; } 
} 

然後將它放入小控制檯應用程序:

static void Main(string[] args) { 
     // original stream 
     var stream = new MemoryStream(new byte[] { 0x81, 0x01,0x02,0x03,0x1B,0x1B,0x82,0x82}); 
     // your initial IObservable<byte[]> 
     IObservable<byte[]> bytes = stream.AsyncRead(128); // or any other buffer size 
     // your IObservable<Message> 
     var observable = Observable.Create<Message>(observer => { 
      // some crude parsing code for the sake of example 
      bool nextIsEscaped = false; 
      bool readingHeader = false; 
      bool readingBody = false; 
      List<byte> body = new List<byte>(); 
      List<byte> header = new List<byte>(); 
      return bytes.Subscribe(buffer => { 
       foreach (var b in buffer) { 
        if (b == 0x81 && !nextIsEscaped && !readingHeader) { 
         // start 
         readingHeader = true; 
         readingBody = false; 
         nextIsEscaped = false; 
        } 
        else if (b == 0x82 && !nextIsEscaped && !readingHeader) { 
         // end 
         readingHeader = false; 
         readingBody = false; 
         if (header.Count > 0 || body.Count > 0) { 
          observer.OnNext(new Message() { 
           Header = header.ToArray(), 
           Body = body.ToArray() 
          }); 
          header.Clear(); 
          body.Clear(); 
         } 
         nextIsEscaped = false; 
        } 
        else if (b == 0x1B && !nextIsEscaped && !readingHeader) { 
         nextIsEscaped = true; 
        } 
        else { 
         if (readingHeader) { 
          header.Add(b); 
          if (header.Count == 3) { 
           readingHeader = false; 
           readingBody = true; 
          } 
         } 
         else if (readingBody) 
          body.Add(b); 
         nextIsEscaped = false; 
        } 
       } 
      }); 
     }); 
     observable.Subscribe(msg => 
     { 
      Console.WriteLine("Header: " + BitConverter.ToString(msg.Header)); 
      Console.WriteLine("Body: " + BitConverter.ToString(msg.Body)); 
     }); 
     Console.ReadKey(); 
    } 
+0

感謝您的回答! 我瞭解你的方法,但對我來說,這感覺非常程序化:它基本上是一個包裝rx的狀態機讀取器。 我想知道是否還有其他更多功能的方法。 –

+0

你的問題中確實有狀態機的元素。你需要知道當前角色是什麼,角色在它之前是什麼,以及可能之前的角色(轉義角色本身是否逃脫了?)。在我的測試中,此代碼符合您的要求 –