2015-10-11 84 views
3

我想按順序讀取來自Websphere MQ的10000封消息,我使用下面的代碼來執行相同的操作,但讀取所有消息需要很長時間。即使我試圖使用多線程概念,但有時2個線程正在消耗相同的組和競態條件。以下是代碼片段。 我想用3個線程從MQ中順序讀取10000條消息,但是我的兩個線程在同一時間訪問同一個組。如何避免這種情況?順序讀取大量消息的最佳方法是什麼?我的要求是我想順序閱讀10000條消息。請幫忙。如何從Websphere讀取大量消息MQ

MQConnectionFactory factory = new MQConnectionFactory(); 
factory.setQueueManager("QM_host") 
MQQueue destination = new MQQueue("default"); 
Connection connection = factory.createConnection(); 
connection.start(); 
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE); 

MessageConsumer lastMessageConsumer = 
    session.createConsumer(destination, "JMS_IBM_Last_Msg_In_Group=TRUE"); 
TextMessage lastMessage = (TextMessage) lastMessageConsumer.receiveNoWait(); 
lastMessageConsumer.close(); 

if (lastMessage != null) { 

    int groupSize = lastMessage.getIntProperty("JMSXGroupSeq"); 
    String groupId = lastMessage.getStringProperty("JMSXGroupID"); 

    boolean failed = false; 

    for (int i = 1; (i < groupSize) && !failed; i++) { 

     MessageConsumer consumer = session.createConsumer(destination, 
      "JMSXGroupID='" + groupId + "'AND JMSXGroupSeq=" + i); 
     TextMessage message = (TextMessage)consumer.receiveNoWait(); 

     if (message != null) { 
      System.out.println(message.getText()); 
     } else { 
      failed = true; 
     } 

     consumer.close(); 

    } 

    if (failed) { 
     session.rollback(); 
    } else { 
     System.out.println(lastMessage.getText()); 
     session.commit(); 
    } 

} 

connection.close(); 

回答

0

我認爲更好的辦法是在你的應用程序中的協調線程,這將監聽組的最後消息,併爲每個將開始一個新的線程來獲得分配給該組中屬於信息線。 (這將滿足競爭條件。)

在獲取屬於某個組的消息的線程中,不需要使用for循環來分別獲取每條消息,而應該獲取屬於該消息的任何消息組,同時保持組計數器和緩衝亂序消息。只要您在接收和處理羣組的所有消息後才提交會話,這將是安全的。 (這會產生更多的性能,因爲每個組將由單獨的線程處理,並且該線程只會在MQ中訪問每條消息。)

0

請參閱關於sequential retrieval of messages的IBM文檔。如果頁面移動或更改,我會引用最相關的部分。對於要保證連續的處理,下列條件必須滿足:

  • 所有PUT請求是從同一個應用程序來完成。
  • 所有的請求都是來自同一個工作單元,或者所有的請求都是在一個工作單元之外完成的。
  • 這些消息都具有相同的優先級。
  • 這些消息都具有相同的持久性。
  • 對於遠程排隊,配置是這樣的,通過它的隊列管理器,通過相互通信,只有一個路徑可以從執行放置請求的應用程序到目標隊列 管理器和目標隊列。
  • 消息未放入死信隊列(例如,如果隊列暫時已滿)。
  • 獲取消息的應用程序不會故意改變檢索順序,例如通過指定特定MsgId 或CorrelId或使用消息優先級。
  • 只有一個應用程序正在執行獲取操作以從目標隊列中檢索消息。如果應用程序不止一個,則必須設計這些應用程序以獲取發送應用程序放置的每個序列中的所有消息。

雖然頁面中沒有明確說明這一點,當他們說「一個應用程序」指的是什麼是一個應用程序的單一線程。如果應用程序具有併發線程,則不保證處理順序。

此外,在工作的單個單元讀取萬個郵件作爲另一響應建議是推薦爲以保留消息順序的手段!只有這樣做,如果10,000郵件必須成功或失敗作爲一個原子單位,這是否與他們是否按順序收到無關。如果大量消息必須在單個工作單元中處理,那麼調整日誌文件的大小以及很可能還有一些其他參數是絕對必要的。對於任何線程異步消息傳輸而言,保留序列順序已經足夠折磨,而不會引入長時間運行的大量事務。

0

你可以用Java類(非JMS)來做你想做的事情,並且它可能適用於JMS的MQ類,但確實很棘手。

首先從MQ Knowledge中讀取page

我將僞代碼(從上面的網頁)轉換爲適用於Java的MQ類,並將其從瀏覽器更改爲破壞性get。

另外,我更喜歡在一個同步點下面做每組消息(假設一個合理大小的組)。

首先,您缺少GMO(GetMessageOptions)的'選項'字段的幾個標誌,並且MatchOptions字段需要設置爲'MQMO_MATCH_MSG_SEQ_NUMBER',以便所有線程始終抓取組中的第一條消息第一條消息。即如上所述,不抓住組中的第二條消息用於第一條消息。

MQGetMessageOptions gmo = new MQGetMessageOptions(); 
MQMessage rcvMsg = new MQMessage(); 

/* Get the first message in a group, or a message not in a group */ 
gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_ALL_MSGS_AVAILABLE | CMQC.MQGMO_WAIT | CMQC.MQGMO_SYNCPOINT; 
gmo.MatchOptions = CMQC.MQMO_MATCH_MSG_SEQ_NUMBER; 
rcvMsg.messageSequenceNumber = 1; 
inQ.get(rcvMsg, gmo); 

/* Examine first or only message */ 
... 

gmo.Options = CMQC.MQGMO_COMPLETE_MSG | CMQC.MQGMO_LOGICAL_ORDER | CMQC.MQGMO_SYNCPOINT; 
do while ((rcvMsg.messageFlags & CMQC.MQMF_MSG_IN_GROUP) == CMQC.MQMF_MSG_IN_GROUP) 
{ 
    rcvMsg.clearMessage(); 
    inQ.get(rcvMsg, gmo); 
    /* Examine each remaining message in the group */ 
    ... 
} 
qMgr.commit(); 
相關問題