2014-03-25 83 views
1

當我在控制檯上顯示的消息都被弄亂,包含被控制檯上印刷的控制返回到傳入前5弦子消息的每個消息打印出接收到的消息消息回調。我強烈認爲這是因爲傳入的消息事件在Booksleeve中引發異步?Redis的Booksleeve客戶端,ResultCompletionMode.PreserveOrder不工作

我參考以下文章How does PubSub work in BookSleeve/ Redis?,作者Marc Gravell指出強制同步接收的能力是通過將完成模式設置爲「PreserveOrder」。我已經這樣做了,在連接客戶端之前和之後嘗試。這兩個似乎都沒有工作。

任何想法如何,我可以接收消息並進行打印的準確順序在控制檯上,他們被送到?在這種情況下,我只有一個出版商。

感謝

編輯:

下面的一些代碼片段展示如何發送消息和Booksleeve包裝我很快就寫了。

這裏所述客戶端(I具有接收消息並檢查順序的類似Client2,但我省略它,因爲它似乎微不足道)。

class Client1 
{ 
    const string ClientId = "Client1"; 
    private static Messaging Client { get; set; } 

    private static void Main(string[] args) 
    { 
     var settings = new MessagingSettings("127.0.0.1", 6379, -1, 60, 5000, 1000); 
     Client = new Messaging(ClientId, settings, ReceiveMessage); 
     Client.Connect(); 

     Console.WriteLine("Press key to start sending messages..."); 
     Console.ReadLine(); 

     for (int index = 1; index <= 100; index++) 
     { 
      //I turned this off because I want to preserve 
      //the order even if messages are sent in rapit succession 

      //Thread.Sleep(5); 

      var msg = new MessageEnvelope("Client1", "Client2", index.ToString()); 
      Client.SendOneWayMessage(msg); 
     } 

     Console.WriteLine("Press key to exit...."); 
     Console.ReadLine(); 

     Client.Disconnect(); 
    } 

    private static void ReceiveMessage(MessageEnvelope msg) 
    { 
     Console.WriteLine("Message Received"); 
    } 
} 

這裏庫的相關代碼片段:

public void Connect() 
    { 
     RequestForReplyMessageIds = new ConcurrentBag<string>(); 

     Connection = new RedisConnection(Settings.HostName, Settings.Port, Settings.IoTimeOut); 
     Connection.Closed += OnConnectionClosed; 
     Connection.CompletionMode = ResultCompletionMode.PreserveOrder; 
     Connection.SetKeepAlive(Settings.PingAliveSeconds); 

     try 
     { 
      if (Connection.Open().Wait(Settings.RequestTimeOutMilliseconds)) 
      { 
       //Subscribe to own ClientId Channel ID 
       SubscribeToChannel(ClientId); 
      } 
      else 
      { 
       throw new Exception("Could not connect Redis client to server"); 
      } 
     } 
     catch 
     { 
      throw new Exception("Could not connect Redis Client to Server"); 
     } 
    } 

public void SendOneWayMessage(MessageEnvelope message) 
    { 
     SendMessage(message); 
    } 

private void SendMessage(MessageEnvelope msg) 
    { 
     //Connection.Publish(msg.To, msg.GetByteArray()); 
     Connection.Publish(msg.To, msg.GetByteArray()).Wait(); 
    } 

private void IncomingChannelSubscriptionMessage(string channel, byte[] body) 
    { 
     var msg = MessageEnvelope.GetMessageEnvelope(body); 

     //forward received message 
     ReceivedMessageCallback(msg); 

     //release requestMessage if returned msgId matches 
     string msgId = msg.MessageId; 
     if (RequestForReplyMessageIds.Contains(msgId)) 
     { 
      RequestForReplyMessageIds.TryTake(out msgId); 
     } 
    } 

public void SubscribeToChannel(string channelName) 
    { 
     if (!ChannelSubscriptions.Contains(channelName)) 
     { 
      var subscriberChannel = Connection.GetOpenSubscriberChannel(); 
      subscriberChannel.Subscribe(channelName, IncomingChannelSubscriptionMessage).Wait(); 
      ChannelSubscriptions.Add(channelName); 
     } 
    } 

回答

1

沒有看到正是你如何檢查這一點,就很難評論,但我可以說的是,任何線程怪異將是很難追查和修復,因此是不太可能在B​​ookSleeve,given that it has been succeeded加以解決。然而!它將絕對在StackExchange.Redis中檢查。以下是我在SE.Redis中整理的一個裝備(並且令人尷尬的是,它確實突出了一個小錯誤,在下一個版本中修復,所以.222或更高版本)。輸出第一:

Subscribing... 

Sending (preserved order)... 
Allowing time for delivery etc... 
Checking... 
Received: 500 in 2993ms 
Out of order: 0 

Sending (any order)... 
Allowing time for delivery etc... 
Checking... 
Received: 500 in 341ms 
Out of order: 306 

(記住,500×5ms的是2500,所以我們不應該由2993ms號,或341ms驚訝 - 這主要是我們已經加入到輕推的Thread.Sleep成本線程池重疊;如果我們刪除它,兩個循環都需要0ms,這很棒 - 但我們不能看到重疊的問題如此令人信服)

正如您所看到的,第一次運行具有正確的順序輸出;第二次運行有混合順序,但它快十倍。那就是在做小事時。對於real工作會更加明顯。一如既往,這是一種折衷。

這裏的試驗檯:

using System; 
using System.Collections.Generic; 
using System.Diagnostics; 
using System.Threading; 
using StackExchange.Redis; 

static class Program 
{ 
    static void Main() 
    { 
     using (var conn = ConnectionMultiplexer.Connect("localhost")) 
     { 
      var sub = conn.GetSubscriber(); 
      var received = new List<int>(); 
      Console.WriteLine("Subscribing..."); 
      const int COUNT = 500; 
      sub.Subscribe("foo", (channel, message) => 
      { 
       lock (received) 
       { 
        received.Add((int)message); 
        if (received.Count == COUNT) 
         Monitor.PulseAll(received); // wake the test rig 
       } 
       Thread.Sleep(5); // you kinda need to be slow, otherwise 
       // the pool will end up doing everything on one thread 
      }); 
      SendAndCheck(conn, received, COUNT, true); 
      SendAndCheck(conn, received, COUNT, false); 
     } 

     Console.WriteLine("Press any key"); 
     Console.ReadLine(); 
    } 
    static void SendAndCheck(ConnectionMultiplexer conn, List<int> received, int quantity, bool preserveAsyncOrder) 
    { 
     conn.PreserveAsyncOrder = preserveAsyncOrder; 
     var sub = conn.GetSubscriber(); 
     Console.WriteLine(); 
     Console.WriteLine("Sending ({0})...", (preserveAsyncOrder ? "preserved order" : "any order")); 
     lock (received) 
     { 
      received.Clear(); 
      // we'll also use received as a wait-detection mechanism; sneaky 

      // note: this does not do any cheating; 
      // it all goes to the server and back 
      for (int i = 0; i < quantity; i++) 
      { 
       sub.Publish("foo", i); 
      } 

      Console.WriteLine("Allowing time for delivery etc..."); 
      var watch = Stopwatch.StartNew(); 
      if (!Monitor.Wait(received, 10000)) 
      { 
       Console.WriteLine("Timed out; expect less data"); 
      } 
      watch.Stop(); 
      Console.WriteLine("Checking..."); 
      lock (received) 
      { 
       Console.WriteLine("Received: {0} in {1}ms", received.Count, watch.ElapsedMilliseconds); 
       int wrongOrder = 0; 
       for (int i = 0; i < Math.Min(quantity, received.Count); i++) 
       { 
        if (received[i] != i) wrongOrder++; 
       } 
       Console.WriteLine("Out of order: " + wrongOrder); 
      } 
     } 
    } 
} 
+0

我還是有點被一些您的意見和代碼混淆:那麼,如果我只是火在緊密循環100條消息沒有減緩的Thread.Sleep或類似的訂單不會被保留?這正是我所看到的,我想知道如何強制保存命令 –

+0

@Matt不,那不是我所說的。睡眠只是爲了加劇這種情況。爲了使其工作正常,只需將PreserveAsyncOrder設置爲true即可。無論如何它默認爲true! –

+0

我編輯了我的問題並粘貼了所有相關的代碼。你能否看看我可能做錯了什麼,以防止發送訂單中的消息?請注意,在方法'SendMessage(MessageEnvelope味精)'我故意去了同步路線。維持秩序是否有必要?根據您之前的解釋,我的理解是,通過'PreserveOrder'我可以發送異步消息,訂單仍然保留。無論如何,這兩種方式都行不通。 –