2013-08-23 178 views
0

我試圖設置一個命名管道服務器和客戶端在兩個程序之間發送數據。 我的問題是,當我的數據被收回,例如。 BeginRead命令om服務器觸發器在我從客戶端序列化了一個對象後,它爲同樣的消息觸發了20次回調。目標是客戶端程序將向服務器程序發送命令。當服務器處理任務時,當有一個連接時,它會將狀態更新發送回客戶端。命名管道異步

這是我目前的測試程序。

class Program 
{ 
    static void Main(string[] args) 
    { 
     var server = new PipeServer(); 
     server.Init(); 

     var client = new PipeClient(); 
     if (client.Connect()) 
     { 
      Console.WriteLine("Connected to server."); 
     } 
     else 
     { 
      Console.WriteLine("Connection failed."); 
      return; 
     } 

     while (true) 
     { 
      Console.Write(" \\> "); 
      string input = Console.ReadLine(); 
      if (string.IsNullOrEmpty(input)) break; 

      var arr = input.Split(new char[] { ' ' }, StringSplitOptions.RemoveEmptyEntries); 
      int value = 0; 

      if (arr.Length != 2) break; 
      if (!int.TryParse(arr[1], out value)) break; 

      var obj = new PipeObject { Name = arr[0], Value = value }; 
      client.Send(obj); 

      //string result = f.Deserialize(client) as string; 
      //Console.WriteLine(result); 
     } 
    } 
} 

internal class PipeServer 
{ 
    IFormatter Formatter = new BinaryFormatter(); 
    public NamedPipeServerStream Instance { get; internal set; } 
    public bool IsConnected { get; internal set; } 
    byte[] buffer = new byte[65535]; 
    public object Message { get; set; } 

    StreamReader sr; 
    StreamWriter sw; 

    internal PipeServer() 
    { 
     IsConnected = false; 
    } 

    public void Init() 
    { 
     var ps = new PipeSecurity(); 
     ps.AddAccessRule(new PipeAccessRule(WindowsIdentity.GetCurrent().User, PipeAccessRights.FullControl, AccessControlType.Allow)); 
     ps.AddAccessRule(new PipeAccessRule(new SecurityIdentifier(WellKnownSidType.AuthenticatedUserSid, null), PipeAccessRights.ReadWrite, AccessControlType.Allow)); 

     Instance = new NamedPipeServerStream("Levscan4Pipe", PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.Asynchronous | PipeOptions.WriteThrough, 65535, 65535, ps); 

     sr = new StreamReader(Instance); 
     sw = new StreamWriter(Instance); 

     Instance.BeginWaitForConnection(OnClientConnected, Instance); 

     Thread t = new Thread(Run); 
     t.Start(); 
    } 

    void Run() 
    { 
     int index = 0; 
     if (IsConnected) 
     { 
      try 
      { 
       Instance.BeginRead(buffer, 0, buffer.Length, OnRead_Completed, Instance); 
       //index += Instance.Read(buffer, 0, buffer.Length); 
       //try 
       //{ 
       // using (var ms = new MemoryStream(buffer)) 
       // { 
       //  Message = Formatter.Deserialize(ms); 
       //  index = 0; 
       // } 
       //} 
       //catch (Exception e) 
       //{ 
       // Debug.WriteLine(e.Message); 
       // Debug.WriteLine(e.StackTrace); 
       //} 
      } 
      catch (IOException) 
      { 
       IsConnected = false; 
       Instance.Disconnect(); 
      } 
     } 

     Thread.Sleep(Timeout.Infinite); 
     //Instance.WaitForConnection(); 
     //Thread t = new Thread(Run); 
     //t.Start(); 
    } 

    void OnClientConnected(IAsyncResult ar) 
    { 
     Instance.EndWaitForConnection(ar); 
     IsConnected = true; 
    } 

    void OnRead_Completed(IAsyncResult ar) 
    { 
     var bytes = Instance.EndRead(ar); 
     Debug.WriteLine("{1} > Read completed - bytes read: {0}".FormatWith(bytes, DateTime.Now.ToString())); 

     //try 
     //{ 
     // using (var ms = new MemoryStream(buffer)) 
     // { 
     //  Message = Formatter.Deserialize(ms); 
     // } 
     //} 
     //catch (Exception e) 
     //{ 
     // Debug.WriteLine(e.Message); 
     // Debug.WriteLine(e.StackTrace); 
     //} 
    } 
} 

internal class PipeClient 
{ 
    IFormatter f = new BinaryFormatter(); 
    public NamedPipeClientStream Instance { get; internal set; } 
    StreamWriter sw; 
    StreamReader sr; 

    public PipeClient() 
    { 
     Instance = new NamedPipeClientStream(".", "Levscan4Pipe", PipeDirection.InOut, PipeOptions.Asynchronous | PipeOptions.WriteThrough); 
     sr = new StreamReader(Instance); 
     sw = new StreamWriter(Instance); 
    } 

    public bool Connect() 
    { 
     try 
     { 
      Instance.Connect(5000); 
      Instance.ReadMode = PipeTransmissionMode.Message; 
      Instance.WaitForPipeDrain(); 
      return true; 
     } 
     catch 
     { 
      return false; 
     } 
    } 

    public void Send(object obj) 
    { 
     f.Serialize(Instance, obj); 
     Instance.Flush(); 
     Instance.WaitForPipeDrain(); 
    } 
} 

編輯

改變了while循環到如果開始的BeginRead。這解決了多個回調,但我仍然沒有得到一個完整的消息。

+2

我建議你閱讀文檔。例如,在流上調用'Read'將返回讀取的字節數。它看起來像你假設你會在一次閱讀中閱讀你的整個信息,這不一定是這樣。 –

+0

我得到在我的回調中讀取的字節數,但我不能得到消息的字節總數。我的NamedPipeServerStream對象不支持長度和位置。 也想通了,我不能使用一段時間(IsConnected),每次調用BeginRead函數,導致我的回調同時運行多次。 – Azazel

+0

如果我序列化我的對象到一個內存流,然後從中獲取字節並將它們全部發送到pipestreams中寫入方法我在回調中直接獲取消息並且它可以工作。爲什麼不用我的pipestream直接序列化得到相同的結果? – Azazel

回答

1

如果服務器像寫入流:

write field 1 
write field 2 
write field 3 
etc. 

還有就是寫之間的一段時間,接收器(程序)可以讀取前三個字段,而服務器還在寫其他。管道流不知道服務器何時完成寫入,因此它不能緩衝所有內容並將其全部發送給您。

當服務器首先將所有內容寫入內存流,然後將內存流複製到管道流中時,您的程序可以一次完成所有內容。也許。如果服務器正在發送一個非常大的數據包,則可能只是讀取其中的一部分。

管道流只是一個字節流。它不會對數據施加任何格式。它沒有任何記錄或任何類似的概念。因此,您必須像字節流一樣對待它,並自己編寫記錄等。

如果您需要知道從服務器發送的記錄大小,服務器必須將該信息放入流中爲你。通常,服務器將寫入長度,然後寫入數據。接收器可以讀取長度,將其轉換爲整數,然後從流中讀取許多字節。而且,是的,它可能需要多次讀取才能獲得所有的字節。這只是字節流的性質。

處理這個問題的另一種方法是創建一個記錄結束標記。所以服務器發送它的數據並且你的程序讀取,直到找到表示記錄結束的字節序列。不過,您必須小心,因爲服務器可能會發送多個記錄,您的讀取可能會抓取一條記錄的結尾以及下一條記錄的結尾。

使用字節流可能會有很多工作,因爲您必須在讀取字節後重新構建記錄。如果可以的話,使用現有框架(如WCF,如其中一個評論中所述)更容易。