2016-03-09 20 views
1

我錯過了什麼?ActiveMQ/JMS「丟失」消息 - 我錯過了什麼?

AMQ版本5.13.2 爪哇1.8.0_74 視窗10

給定一個簡單的測試情況下,兩個對象的消息被髮送時,一個與數據,另一個是數據結束-的標記物。僅接收到數據結束標記。

隊列在作業開始時創建,並在作業完成後銷燬。

如果我運行了大量的交易,我看到大約50%的接收率。

日誌清楚地顯示接收器在第一條消息放入隊列之前啓動,兩條消息都放在隊列中,但實際上只接收第二條消息。

發送者和接收者都在同一個JVM上。每個都有自己的會話和連接。

連接和隊列設置代碼:

@Override 
public void beforeJob(JobExecution jobExecution) { 
    // TODO Auto-generated method stub 
    try { 
     jobParameters = jobExecution.getJobParameters(); 

     readerConnection = connectionFactory.createConnection(); 
     readerConnection.start(); 

     writerConnection = connectionFactory.createConnection(); 
     writerConnection.start(); 

     jmsQueueManagementSession = writerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); 

     queueName = jobParameters.getString("jobName") + "." + jobExecution.getId(); 

     queue = jmsQueueManagementSession.createQueue(getQueueName()); 

    } catch (JMSException ex) { 
     throw new MaxisRuntimeException(
       MaxisCodeHelperImpl.generateCode("MXAR", MXMODULE, JMS_RECEIVER_INITIALIZATION_ERROR), null); 
    } 

} 

發件人設置代碼:

@Override 
public void beforeStep(StepExecution stepExecution) { 

    this.stepExecution = stepExecution; 
    this.setJobExecution(stepExecution.getJobExecution()); 
    try { 
     this.connection = jmsJobExecutionListener.getWriterConnection(); 
     this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     this.messageProducer = session.createProducer(jmsJobExecutionListener.getQueue()); 
    } catch (JMSException ex) { 
     throw new RuntimeException(ex.getMessage(), ex); 
    } 
} 

接收機設置代碼:

@Override 
@BeforeStep 
public void beforeStep(StepExecution stepExecution) { 
    this.stepExecution = stepExecution; 
    this.setJobExecution(stepExecution.getJobExecution()); 
    try { 
     this.connection = jmsJobExecutionListener.getReaderConnection(); 
     this.jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     this.messageConsumer = jmsSession.createConsumer(jmsJobExecutionListener.getQueue()); 
    } 
    catch (JMSException ex) 
    { 
     throw new RuntimeException(ex.getMessage(), ex); 
    } 

} 

發送代碼:

private void doSomeStuffInTransaction (final Object model) { 
     transactionTemplate.execute(new TransactionCallbackWithoutResult() { 
      @Override 
      protected void doInTransactionWithoutResult(TransactionStatus status) { 
       try { 
        doSomeStuff (model); 

        ObjectMessage message = session.createObjectMessage(
          (model.getRoot() == null) 
          ? null 
          : model.getRoot().getContents().getId()); 
        messageProducer.send(message); 
        logger.debug("Sent: {}", message.toString()); 
       }catch (Exception e) { 
         //use this to rollback exception in case of exception 
        status.setRollbackOnly(); 
        throw new RuntimeException(e.getmessage(), e); 
       } 

      }}); 
    } 

接收機代碼:

public Object read() throws Exception, 
     UnexpectedInputException, ParseException, 
     NonTransientResourceException { 

    Object result = null; 

    logger.debug("Attempting to receive message on connection: ", connection.toString()); 

    ObjectMessage msg = (ObjectMessage) messageConsumer.receive(); 
    logger.debug("Received: {}", msg.toString()); 
    result = msg.getObject(); 

    return result; 
} 

登錄剪斷:

DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on connection: 
... snip ... 
*** First message *** 
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:1 sent to queue://Stuff via SQL.402 
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:2214 
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610215, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} 
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue 

*** Second Message *** 
INFO com.maxis.mxmove.core.SourceSelectionReaderImpl - Returning empty stuff and end-of-stream placeholder. 
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:2 sent to queue://Stuff via SQL.402 
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 0, memUsage:3155 
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} 
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue 

*** We received the last message, not the first. We show two enqueues, and one dequeue.. *** 
DEBUG com.maxis.mxmove.core.SelectedDataJmsReader - Received: ActiveMQObjectMessage {commandId = 7, responseRequired = true, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:zip-56502-1457640572818-4:2:2:1, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 1457640610375, brokerOutTime = 1457640610376, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false} 
INFO com.maxis.mxmove.core.SelectedDataJmsReader - executed read, found end-of-stream marker, returning null 
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 1, memUsage:1107 
+0

能否請您提供一個演示行爲的小型工作示例。 – SubOptimal

+0

我剝去了一個小的工作示例,但行爲未被複制。 –

+0

我啓用了activemq隊列中的調試日誌記錄,並更新了原始帖子中的日誌片段。 –

回答

2

在接收機中設置的代碼,注意beforeStep()方法被註解爲@BeforeStep。我認爲這意味着接收器被設置了兩次,並且可能具有預取優化。這是驗證,因爲該日誌顯示兩個訂閱。不是一個沉重的JMS用戶,我錯誤的印象是,一個用於接收者,另一個用於發送者。

@Override 
@BeforeStep 
public void beforeStep(StepExecution stepExecution) { 
    this.stepExecution = stepExecution; 
    this.setJobExecution(stepExecution.getJobExecution()); 

取出@BeforeStep註釋後,日誌只顯示一個訂閱

DEBUG org.apache.activemq.broker.region.Queue - queue://Workorders via SQL.408, subscriptions=1, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 1370, dequeueCount: 1369, memUsage:1024 

下一次有人告訴我,「乾淨」的代碼是如何當你採用注射,我可能會考慮Martial Arts Exchange應用