2012-10-19 41 views
1

當我從Stomp中讀取一條AMQ消息時,我得到3條或4條消息出隊,不知道原因。ActiveMQ + Stomp,讀取一條消息,但其中四條消息出列

用於填充AMQ代碼:

public function populate($queue, $c = 10) { 

    for($i = 0; $i < $c; $i++) { 
     $this->stomp->send($queue,' Random populated:'.rand(0, PHP_INT_MAX)); 
    } 

} 

ActiveMQ Queue after populating ActiveMQ messages after populating

代碼讀取AMQ:

public function read($queue = null) { 

    if(is_null($queue)) { 
     if(!$this->isSubscribed()) { 
      return false; 
     } 
    } else { 
     $this->subscribe($queue); 
    } 

    return $this->stomp->readFrame(); 

} 

踐踏readFrame()的代碼:

public function readFrame() 
{ 
    if (!$this->hasFrameToRead()) { 
     return false; 
    } 

    $rb = 1024; 
    $data = ''; 
    $end = false; 

    do { 
     $read = fread($this->_socket, $rb); 
     if ($read === false) { 
      $this->_reconnect(); 
      return $this->readFrame(); 
     } 
     $data .= $read; 
     if (strpos($data, "\x00") !== false) { 
      $end = true; 
      $data = rtrim($data, "\n"); 
     } 
     $len = strlen($data); 
    } while ($len < 2 || $end == false); 

    list ($header, $body) = explode("\n\n", $data, 2); 
    $header = explode("\n", $header); 
    $headers = array(); 
    $command = null; 
    foreach ($header as $v) { 
     if (isset($command)) { 
      list ($name, $value) = explode(':', $v, 2); 
      $headers[$name] = $value; 
     } else { 
      $command = $v; 
     } 
    } 
    $frame = new StompFrame($command, $headers, trim($body)); 
    if (isset($frame->headers['transformation']) && $frame->headers['transformation'] == 'jms-map-json') { 
     require_once 'Stomp/Message/Map.php'; 
     return new StompMessageMap($frame); 
    } else { 
     return $frame; 
    } 
    return $frame; 
} 

我敢肯定,正在執行一次代碼100%,但其結果是: ActiveMQ queue after reading one message ActiveMQ messages after reading one message

Var_dumped消息:

object(StompFrame)[4] 
public 'command' => string 'MESSAGE' (length=7) 
public 'headers' => 
array (size=5) 
    'message-id' => string 'ID:**********_-49723-1350635513276-2:1:-1:1:1' (length=45) 
    'destination' => string '/queue/test' (length=11) 
    'timestamp' => string '1350635842968' (length=13) 
    'expires' => string '0' (length=1) 
    'priority' => string '4' (length=1) 
public 'body' => string 'Random populated:1859256320' (length=27) 

有誰知道什麼可爲這種行爲的原因是什麼?

聲明:

  • 沒有消息的ACK所以連一個也沒有消息要出隊:|
  • ACK是在客戶端模式
  • 預取大小設置爲1
+0

調試什麼?我的腳本,Stomp或ActiveMQ?我的腳本正在做一個連接和一個readFrame(),這是var_dumped如上,沒有循環,沒有框架,簡單的平板PHP。我添加了Stomp-> readFrame()代碼。 – S3Mi

回答

3

我沒有看到你連接代碼,但我會假設你正在使用自動確認模式連接。在採用自動確認模式的STOMP中,一旦消息被觸發,消息就會被刪除,並且由於我還假設您沒有更改預取大小,所以經紀商會向您發送一批消息,以便您從消息套接字和更多可以發送更多將獲得出隊。如果您希望對消息消耗進行更細緻的控制,則應使用其他確認模式(如客戶端確認),並在每條消息到達時對其進行確認。您還可以爲預訂設置預取窗口,以減少分配給客戶端的消息數量。

有關AMQ STOMP配置選項,請參見page。您可能還想再次查看STOMP規範。

+0

確認處於客戶端模式,預取大小設置爲1. – S3Mi

+1

您應該添加連接代碼,否則在問題中不明確。 –