2017-01-23 24 views
1

首先,值得一提的是,在單個F#解決方案中,序列化和反序列化Bond消息工作正常。但是,我無法正確處理通過ZeroMQ發送和/或接收消息。ZeroMQ上的MS Bond序列化中的運行時「EndOfStreamException」錯誤

以下程序的用戶端存在運行時錯誤。一個.bond文件是用bond編譯器定義和編譯的。然後從C#創建一個dll,從F#調用。然後我有兩個F#程序。一個通過tcp套接字發佈序列化數據,另一個是訂閱者。在sub上收到消息時,嘗試解組原始數據的行是導致運行時錯誤的行。任何人都可以看到這個原因嗎?

[編輯]根據Fyodor的評論,我對發佈方進行了修改,改變了用戶端的錯誤。所以這個錯誤可能與我打包和解包信息有關。

這是.bond文件

namespace Examples 

struct Record 
{ 
    0: map<string, double> payload; 
} 

這裏是出版商:

// publisher 

open System 
open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open ZeroMQ 

let ctx = new ZContext() 
let publisher = new ZSocket(ctx, ZSocketType.PUB) 
publisher.Bind("tcp://*:5556") 

let src = new Examples.Record() 
src.payload.Add("a", 1.) 
src.payload.Add("b", 2.) 

let output = new OutputBuffer() 
let writer = new CompactBinaryWriter<OutputBuffer>(output) 

while true do 
    Marshal.To(writer, src) 
    //let input = new InputBuffer(output.Data) 
    //let byteArr = input.ReadBytes(int(input.Length - 1L)) 
    let updateFrame = new ZFrame(System.Text.Encoding.ASCII.GetString output.Data.Array) 
    publisher.Send(updateFrame) 

這裏是用戶:

// subscriber 

open Bond 
open Bond.Protocols 
open Bond.IO.Safe 
open System 
open System.Text 
open ZeroMQ 

let ctx = new ZContext() 
let subscriber = new ZSocket(ctx, ZSocketType.SUB) 
subscriber.Connect("tcp://127.0.0.1:5556") 
subscriber.SubscribeAll() 

let output = new OutputBuffer()  
while true do  
    let received = subscriber.ReceiveFrame() 
    let byteArr = Encoding.ASCII.GetBytes (received.ReadString()) 
    let arrSeg = ArraySegment<byte>(byteArr) 
    let input = new InputBuffer(arrSeg) 
    let dst = Unmarshal<Examples.Record>.From(input) 
    for KeyValue(k, v) in dst.payload do 
     printfn "%A %A" k v 
+3

我看到你正在'byteArr.ToString()'中創建'ZFrame'。這不符合你的想法。嘗試打印出'byteArr.ToString()'的結果來查看實際發送的內容。 –

回答

4

在接收端,當您嘗試將編組的Bond Binary解碼爲一個ASCII字符串,您正在丟失一些有效負載。當將結構如Record編組爲Compact Binary時,有效負載的前四個字節是0x43 0x42 0x10 0x00。從ZFrame讀取字符串時,無論幀的大小如何,遇到的the first embedded NUL (0x00)都表示字符串結束。因此,讀取側只能看到0x43 0x42 0x10而不是整個有效負載(當我測試時爲29個字節)。

由於緊湊型開關是一個二進制協議,你要使用ZFrame構造函數,它在發佈商端的緩衝區:

let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count) 

在用戶側,你會希望只讀取緩衝區:

let byteArr = received.Read() 

此外,在發佈方,您不斷累積數據到相同的OutputBuffer。你會希望你馬歇爾之前重置output.Position 0您的下一個記錄,重新使用,而不是成長的IT緩衝區:

while true do 
    Marshal.To(writer, src) 
    let updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count)output.Data.Array) 
    publisher.Send(updateFrame) 
    output.Position <- 0 

另一件事要注意:分配給一個OutputBuffer默認的緩衝區是65KiB。一旦你知道你的有效載荷有多大,就考慮把它縮小一點。

注意:我在具有類似語義的C#應用​​程序中對此進行了調試。這是我用的:

namespace so_q_zmq 
{ 
    using System; 
    using System.Collections.Generic; 
    using System.Text; 
    using System.Threading.Tasks; 
    using Bond; 
    using Bond.IO.Safe; 
    using Bond.Protocols; 
    using ZeroMQ; 

    [Schema] 
    class Record 
    { 
     [Id(0)] 
     public Dictionary<string, double> payload = new Dictionary<string, double>(); 
    } 

    class Program 
    { 
     static void Main(string[] args) 
     { 
      var pTask = Task.Run(() => 
      { 
       try 
       { 
        Publisher(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Publisher failed: {0}", ex); 
       } 
      }); 

      var sTask = Task.Run(() => 
      { 
       try 
       { 
        Subscriber(); 
       } 
       catch (Exception ex) 
       { 
        Console.WriteLine("Subscriber failed: {0}", ex); 
       } 
      }); 

      Task.WaitAll(pTask, sTask); 
      Console.WriteLine("Done"); 
      Console.ReadLine(); 
     } 

     static void Publisher() 
     { 
      var ctx = new ZContext(); 
      var publisher = new ZSocket(ctx, ZSocketType.PUB); 
      publisher.Bind("tcp://127.0.0.1:12345"); 

      var src = new Record(); 
      src.payload.Add("a", 1.0); 
      src.payload.Add("b", 2.0); 

      var output = new OutputBuffer(); 
      var writer = new CompactBinaryWriter<OutputBuffer>(output); 

      for (;;) 
      { 
       Marshal.To(writer, src); 
       // INCORRECT: 
       // var str = Encoding.ASCII.GetString(output.Data.Array); 
       // var updateFrame = new ZFrame(str); 
       var updateFrame = new ZFrame(output.Data.Array, output.Data.Offset, output.Data.Count); 
       publisher.Send(updateFrame); 
       output.Position = 0; 
      } 
     } 

     static void Subscriber() 
     { 
      var ctx = new ZContext(); 
      var subscriber = new ZSocket(ctx, ZSocketType.SUB); 
      subscriber.Connect("tcp://127.0.0.1:12345"); 
      subscriber.SubscribeAll(); 

      for (;;) 
      { 
       var received = subscriber.ReceiveFrame(); 
       // INCORRECT 
       // var str = received.ReadString(); 
       // var byteArr = Encoding.ASCII.GetBytes(str); 
       var byteArr = received.Read(); 
       var arrSeg = new ArraySegment<byte>(byteArr); // There's an InputBuffer ctor that takes a byte[] directly 
       var input = new InputBuffer(arrSeg); 
       var dst = Unmarshal<Record>.From(input); 
       foreach (var kvp in dst.payload) 
       { 
        Console.WriteLine("{0} {1}", kvp.Key, kvp.Value); 
       } 
      } 
     } 
    } 
} 
相關問題