1
我錯過了什麼?ActiveMQ/JMS「丟失」消息 - 我錯過了什麼?
AMQ版本5.13.2 爪哇1.8.0_74 視窗10
給定一個簡單的測試情況下,兩個對象的消息被髮送時,一個與數據,另一個是數據結束-的標記物。僅接收到數據結束標記。
隊列在作業開始時創建,並在作業完成後銷燬。
如果我運行了大量的交易,我看到大約50%的接收率。
日誌清楚地顯示接收器在第一條消息放入隊列之前啓動,兩條消息都放在隊列中,但實際上只接收第二條消息。
發送者和接收者都在同一個JVM上。每個都有自己的會話和連接。
連接和隊列設置代碼:
@Override
public void beforeJob(JobExecution jobExecution) {
// TODO Auto-generated method stub
try {
jobParameters = jobExecution.getJobParameters();
readerConnection = connectionFactory.createConnection();
readerConnection.start();
writerConnection = connectionFactory.createConnection();
writerConnection.start();
jmsQueueManagementSession = writerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queueName = jobParameters.getString("jobName") + "." + jobExecution.getId();
queue = jmsQueueManagementSession.createQueue(getQueueName());
} catch (JMSException ex) {
throw new MaxisRuntimeException(
MaxisCodeHelperImpl.generateCode("MXAR", MXMODULE, JMS_RECEIVER_INITIALIZATION_ERROR), null);
}
}
發件人設置代碼:
@Override
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.setJobExecution(stepExecution.getJobExecution());
try {
this.connection = jmsJobExecutionListener.getWriterConnection();
this.session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.messageProducer = session.createProducer(jmsJobExecutionListener.getQueue());
} catch (JMSException ex) {
throw new RuntimeException(ex.getMessage(), ex);
}
}
接收機設置代碼:
@Override
@BeforeStep
public void beforeStep(StepExecution stepExecution) {
this.stepExecution = stepExecution;
this.setJobExecution(stepExecution.getJobExecution());
try {
this.connection = jmsJobExecutionListener.getReaderConnection();
this.jmsSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
this.messageConsumer = jmsSession.createConsumer(jmsJobExecutionListener.getQueue());
}
catch (JMSException ex)
{
throw new RuntimeException(ex.getMessage(), ex);
}
}
發送代碼:
private void doSomeStuffInTransaction (final Object model) {
transactionTemplate.execute(new TransactionCallbackWithoutResult() {
@Override
protected void doInTransactionWithoutResult(TransactionStatus status) {
try {
doSomeStuff (model);
ObjectMessage message = session.createObjectMessage(
(model.getRoot() == null)
? null
: model.getRoot().getContents().getId());
messageProducer.send(message);
logger.debug("Sent: {}", message.toString());
}catch (Exception e) {
//use this to rollback exception in case of exception
status.setRollbackOnly();
throw new RuntimeException(e.getmessage(), e);
}
}});
}
接收機代碼:
public Object read() throws Exception,
UnexpectedInputException, ParseException,
NonTransientResourceException {
Object result = null;
logger.debug("Attempting to receive message on connection: ", connection.toString());
ObjectMessage msg = (ObjectMessage) messageConsumer.receive();
logger.debug("Received: {}", msg.toString());
result = msg.getObject();
return result;
}
登錄剪斷:
DEBUG com.mylib.SelectedDataJmsReader - Attempting to receive message on connection:
... snip ...
*** First message ***
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:1 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 1, Inflight: 0, pagedInMessages.size 0, pagedInPendingDispatch.size 0, enqueueCount: 1, dequeueCount: 0, memUsage:2214
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:1, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610215, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = [email protected], marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue
*** Second Message ***
INFO com.maxis.mxmove.core.SourceSelectionReaderImpl - Returning empty stuff and end-of-stream placeholder.
DEBUG org.apache.activemq.broker.region.Queue - localhost Message ID:zip-56502-1457640572818-4:2:2:1:2 sent to queue://Stuff via SQL.402
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=2, pending=0 toPageIn: 1, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 0, memUsage:3155
DEBUG com.maxis.mxmove.core.SelectedDataJmsWriter - Sent: ActiveMQObjectMessage {commandId = 0, responseRequired = false, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = null, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsWriter - Committed 1 stuff to redo log and JMS queue
*** We received the last message, not the first. We show two enqueues, and one dequeue.. ***
DEBUG com.maxis.mxmove.core.SelectedDataJmsReader - Received: ActiveMQObjectMessage {commandId = 7, responseRequired = true, messageId = ID:zip-56502-1457640572818-4:2:2:1:2, originalDestination = null, originalTransactionId = null, producerId = ID:zip-56502-1457640572818-4:2:2:1, destination = queue://Stuff via SQL.402, transactionId = null, expiration = 0, timestamp = 1457640610375, arrival = 0, brokerInTime = 1457640610375, brokerOutTime = 1457640610376, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1024, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false}
INFO com.maxis.mxmove.core.SelectedDataJmsReader - executed read, found end-of-stream marker, returning null
DEBUG org.apache.activemq.broker.region.Queue - queue://Stuff via SQL.402, subscriptions=2, memory=0%, size=1, pending=0 toPageIn: 0, Inflight: 1, pagedInMessages.size 1, pagedInPendingDispatch.size 0, enqueueCount: 2, dequeueCount: 1, memUsage:1107
能否請您提供一個演示行爲的小型工作示例。 – SubOptimal
我剝去了一個小的工作示例,但行爲未被複制。 –
我啓用了activemq隊列中的調試日誌記錄,並更新了原始帖子中的日誌片段。 –