2016-10-10 65 views
0

我有一個經銷商< - >路由器設置在NetMQ v4我可以在任何方向異步發送和接收消息,沒有問題。如何在ZeroMQ或NetMQ中從路由器套接字發送和接收數據?

我現在要正式到這一點抽象,其中服務器(路由器)監聽任何進入的消息,但它也需要按需廣播消息到任何連接的客戶端(經銷商)的。

我試圖避免使用Pub < - >子套接字,因爲我需要訂戶也發送消息到服務器。我試圖實現的最接近的模式是WebSocket客戶端 - 服務器通信。

聽取客戶端的消息的第一部分是在像做:

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 
    while (true) 
    { 
     var msg = server.ReceiveMultipartMessage(); 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); }    

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

現在考慮到插座不是線程安全的,我被困在尋找一種方式來廣播消息(來自哪裏根據需求提供不同的線索)給所有客戶。

我大概可以在循環中使用TryReceiveMultipartMessage方法,而不是設置超時後,我可以檢查任何廣播消息的隊列,然後通過發送此類消息的每個客戶端循環。喜歡的東西:

using (var server = new RouterSocket("@tcp://*:80")) 
{ 
    var addresses = new HashSet<string>(); 

    var msg = new NetMQMessage(); 
    while (true) 
    { 
     var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg); 
     if (!clientHasMsg) 
     { 
      // Check any incoming broacast then loop through all the clients 
      // sending each the brodcast msg 
      var broadMsg = new NetMQMessage(); 
      foreach (var item in addresses) 
      { 
       broadMsg.Append(item); 
       broadMsg.AppendEmptyFrame(); 
       broadMsg.Append("This is a broadcast"); 
       server.SendMultipartMessage(broadMsg); 
       broadMsg.Clear(); 
      } 

      // Go back into the loop waiting for client messages 
      continue; 
     } 

     var address = Encoding.UTF8.GetString(msg[0].Buffer); 
     var payload = Encoding.UTF8.GetString(msg[2].Buffer); 
     Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload); 

     var contains = addresses.Contains(address); 
     if (!contains) { addresses.Add(address); } 

     msg.Clear(); 
     msg.Append(address); 
     msg.AppendEmptyFrame(); 
     msg.Append("Reply for: " + address); 
     server.SendMultipartMessage(msg); 
    } 
} 

這在某種程度上感覺不對主要是由於:

  • 的超時值什麼是物有所值? 1秒,100毫秒等;
  • 這是效率最高/性能最好的解決方案,因爲該程序將用於連接100k +客戶端,每個客戶端每秒發送數千條消息。

什麼是最好的方法,這是非常讚賞的任何指針。

回答

1

你可以使用netmqqueue,它是多生產者單消費者隊列。您可以將它添加到NetMQPoller中,並在不鎖定的情況下從多個線程排隊。

+0

我剛剛在'Device'上讀到你的博客,並認爲在你提到'Queue'之前這將是一個不錯的選擇:-)你有沒有任何例子(除了http://netmq.readthedocs上的幾行)。 IO)? – MaYaN

+0

好吧,我想我可以在沒有其他例子的情況下使用這個工作,只是有一個問題,NetMQQueue 和NetMQSscheduler有什麼區別?調度程序是否在v4中過時? – MaYaN

+0

NetMQScheduler已過時(現在是NetMQPoller的一部分),無論如何NetMQScheduler是任務隊列,NetMQQueue是任何類型的隊列。 – somdoron

0

我認爲PUB/SUB是適合您的100k +客戶需求的方法。儘管如此,這並不意味着您無法與服務器通信:使用經銷商/路由器。你爲什麼認爲這個解決方案是不可接受的?

+0

我不知道我的理解是否正確。你是否說使用基於我的上述解決方案的經銷商/路由器來做Pub/Sub對你來說看起來合理? – MaYaN