我使用的地方生產者發佈使用以下策略事件持久主題:JMS的ActiveMQ的主題緩慢消費者
<bean id="jmsTemplateESB" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="cachedJmsConnectionFactory" />
<property name="defaultDestination" ref="activeMQTopic" />
<!-- Value = javax.jms.DeliveryMode.PERSISTENT -->
<property name="deliveryMode" value="2" />
<!-- Value = javax.jms.Session.AUTO_ACKNOWLEDGE -->
<property name="sessionAcknowledgeMode" value="1" />
<!-- Needs to be true for the deliveryMode to work -->
<property name="explicitQosEnabled" value="true" />
</bean>
我使用以下的消費設置:使用下面的聽衆
public static void listenOnTopic(String topicName, MessageListener listener)
throws Exception
{
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(bindAddress);
Connection con = factory.createConnection();
con.setClientID("Consumer");
Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createDurableSubscriber(topic, listener.getClass().getName());
subscriber.setMessageListener(listener);
con.start();
}
public class ActiveMQMessageListener implements MessageListener
{
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQMessageListener.class);
@Autowired
@Qualifier("jmsEventOutPutChannel")
MessageChannel outputChannel;
@Override
public void onMessage(Message message) {
try {
BytesMessage bytesMessage= (BytesMessage) message;
byte[] data = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(data);
org.springframework.integration.Message<byte[]> outputMessage = MessageBuilder.withPayload(data).build();
outputChannel.send(outputMessage);
} catch (JMSException e) {
e.printStackTrace();
LOG.error("Error while retrieving events from ActiveMQ ",e);
}
}
}
以下彈出設置爲輸出通道
<bean id="callerBlockPolicy" class="org.springframework.integration.util.CallerBlocksPolicy">
<constructor-arg type="long" value="10000"></constructor-arg>
</bean>
<bean id="jmsListnerTaskExecutor"
class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
<property name="corePoolSize" value="${CORE_POOL_SIZE}"></property>
<property name="maxPoolSize" value="${THREAD_POOL_SIZE_JMS_LISTENER}"></property>
<property name="queueCapacity" value="${QUEUE_SIZE_JMS_LISTENER}"></property>
<property name="rejectedExecutionHandler" ref="callerBlockPolicy"></property>
<property name="waitForTasksToCompleteOnShutdown" value="true"></property>
</bean>
<int:channel id="jmsEventOutPutChannel">
<int:dispatcher task-executor="jmsListnerTaskExecutor" />
</int:channel>
此消費者代碼太慢,因此我們無法從主題中高速檢索消息。其實,如果沒有圖片中的「jmsEventOutPutChannel」,我會獲得9500 qps的速率,但在圖片中使用「jmsEventOutPutChannel」,我們的速度在150ps左右的時候會變得非常低。
任何人都可以有任何提示我做錯了這段代碼?
我的「jmsEventOutPutChannel」頻道代碼是否也會影響activeMQ的消費率?