我在VS 2010中使用activemq-cpp 3.7.0來構建客戶端,服務器是ActiveMQ 5.8。根據提到的CMS配置here,我創建了一個使用類似於以下代碼的消息使用者。 ConnClass
是ExceptionListener
和MessageListener
。在致電cms::Session::commit()
之前,我只想消費一條消息。ActiveMQ-cpp使用PrefetchPolicy的代理URI沒有任何影響
void ConnClass::setup()
{
// Create a ConnectionFactory
std::tr1::shared_ptr<ConnectionFactory> connectionFactory(
ConnectionFactory::createCMSConnectionFactory(
"tcp://localhost:61616?cms.PrefetchPolicy.queuePrefetch=1");
// Create a Connection
m_connection = std::tr1::shared_ptr<cms::Connection>(
connectionFactory->createConnection());
m_connection->start();
m_connection->setExceptionListener(this);
// Create a Session
m_session = std::tr1::shared_ptr<cms::Session>(
m_connection->createSession(Session::SESSION_TRANSACTED));
// Create the destination (Queue)
m_destination = std::tr1::shared_ptr<cms::Destination>(
m_session->createQueue("myqueue?consumer.prefetchSize=1"));
// Create a MessageConsumer from the Session to the Queue
m_consumer = std::tr1::shared_ptr<cms::MessageConsumer>(
m_session->createConsumer(m_destination.get()));
m_consumer->setMessageListener(this);
}
void ConnClass::onMessage(const Message* message)
{
// read message code ...
// schedule a processing event for
// another thread that calls m_session->commit() when done
}
問題是調用m_session->commit()
之前,我收到了,而不是一個消息的多個消息 - 我知道這是因爲commit()
呼叫是通過用戶輸入觸發。如何確保onMessage()
僅在每次致電commit()
之前調用一次?
但爲什麼不能通過預取「大小」選項來控制消息的數量?或者我錯誤地理解了預取選項? – devil
您不理解該選項。它只是一個跡象表明經紀人有多少工作要嘗試並保持在客戶端上。異步消費者中的異步詞應該是你的線索。 –
我使用異步,因此我不需要管理一個額外的線程,我將不得不明確地通過同步進行。我試圖限制在提交事務之前收到的消息數量。你是否說這不能在異步方法的預取策略中使用size參數來完成?這是否意味着我應該強制onMessage在該方法結束時阻止,直到我提交,如果我只想要一條消息?順便說一句,我使用SESSION_TRANSACTED模式,所以我使用commit來代替ACK消息。 – devil