[編輯]上載完成CONFIGS手動ACK消息:如何從使用的RabbitMQ彈簧一體化
rabbit.xml從兔
<rabbit:connection-factory id="amqpConnectionFactoryInbound"
host="${rabbit.host}" port="${rabbit.port}"
username="${rabbit.username}" password="${rabbit.password}" channel-
cache-size="5"
connection-factory="rabbitConnectionFactoryInbound"/>
<beans:bean id="rabbitConnectionFactoryInbound"
class="com.rabbitmq.client.ConnectionFactory">
<beans:property name="requestedHeartbeat"
value="60" />
</beans:bean>
<!-- Inbound Adapter to AMQP RabbitMq and write to file -->
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="8" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="25" />
<header-enricher input-channel="rabbitInboundMessageChannel" output-
channel="rabbitOutboundboundMessageChannel">
<int:header name="Operation" value="${operation.rabbit}" />
<int:header name="GUID" expression="#{
'T(java.util.UUID).randomUUID().toString()' }" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
</header-enricher>
<int:channel id="rabbitOutboundboundMessageChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS"
pool-size="10-30"
queue-capacity="25" />
</beans:beans>
從隊列然後,該消息被髮送到路由器信道: router.xml
<int:header-enricher input-channel="rabbitOutboundboundMessageChannel"
output-channel="routerChannel">
<int:header name="Operation" value="${operation.router}"
overwrite="true" />
<int:header name="file_name" expression="headers['GUID'] + '.xml'" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }"
overwrite="true" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int:recipient-list-router id="rabbitMsgrouter" input-
channel="routerChannel">
<int:recipient channel="fileBackupChannel" selector-expression="new
String(payload).length()>0" />
<int:recipient channel="transformerChannel" />
</int:recipient-list-router>
<int:channel id="transformerChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="fileBackupChannel"/>
<int:channel id="loggerChannel"/>
</beans>
消息我現在發送到persister.xml和transformer.xml。以下是persister.xml,我想確認持久性是否成功。有transformer.xml
<int:header-enricher input-channel="fileBackupChannel" output-
channel="fileSaveChannel">
<int:header name="Operation" value="${operation.filePersister}"
overwrite="true" />
<int:header name="replyChannel" value="nullChannel" />
<int:header name="operationStartTime" expression="#{
'T(java.lang.System).currentTimeMillis()' }" />
<int:error-channel ref="errorChannelId" />
</int:header-enricher>
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/>
<int:service-activator input-channel="rabbitAckChannel" output-
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" />
<bean id="ackRabbit"
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/>
<int:channel id="rabbitAckChannel">
<int:interceptors>
<int:wire-tap channel="loggerChannel" />
</int:interceptors>
</int:channel>
<int:channel id="loggerChannel"/>
<int:channel id="fileSaveChannel"/>
</beans>
我無法手動地從RabbitMQ的有效載荷ACKING後其他下游工藝。
這是我的工作流程:使用入站通道適配器從兔
1. Get消息:
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter"
channel="rabbitInboundMessageChannel"
concurrent-consumers="${rabbit.concurrentConsumers}" task-
executor="rabbit-executor" connection-
factory="amqpConnectionFactoryInbound"
message-converter="byteArrayToStringConverter" queue-
names="${rabbit.queue}" acknowledge-mode="MANUAL" error-
channel="errorChannelId"
prefetch-count="${rabbit.prefetchCount}" />
2.使用出站網關堅持消息到磁盤:
<int-file:outbound-gateway id="fileBackUpChannelAdapter"
directory="${file.location}"
request-channel="fileSaveChannel" reply-channel="loggerChannel" />
3.來自兔子的時候persister(步驟2)s ucceeds。
對於步驟(3)中,i寫的以下代碼:
public class RabbitAcknowledgement {
public void handleRabbitAcks(Message<?> message) throws IOException {
com.rabbitmq.client.Channel channel = (Channel)
message.getHeaders().get("amqp_channel");
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag");
channel.basicAck(deliveryTag, false);
}
其中我從彈簧通過呼叫:
<int:service-activator input-
channel="rabbitOutboundboundMessageChannel" output-
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" />
這不起作用和兔子我的隊列中的有效載荷未被收集。
我的問題是:
- 我需要手動ACK在這種情況下?
- 我在做什麼錯?
什麼是證明不起作用的症狀? –
有效載荷留在兔子身上,不被消費者消費。 –
另外,在我目前的設置中,我相信每條線程都會收到1條消息。我可以以某種方式提高比率(即分批確認?)對於功能來說並不關鍵,但是想知道如果我以某種方式使系統更高效 –