2017-06-21 20 views
0

[編輯]上載完成CONFIGS手動ACK消息:如何從使用的RabbitMQ彈簧一體化

rabbit.xml從兔

<rabbit:connection-factory id="amqpConnectionFactoryInbound" 
host="${rabbit.host}" port="${rabbit.port}" 
username="${rabbit.username}" password="${rabbit.password}" channel- 
cache-size="5" 
connection-factory="rabbitConnectionFactoryInbound"/> 

<beans:bean id="rabbitConnectionFactoryInbound" 
class="com.rabbitmq.client.ConnectionFactory"> 
<beans:property name="requestedHeartbeat" 
value="60" /> 
</beans:bean> 


<!-- Inbound Adapter to AMQP RabbitMq and write to file --> 
<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel" 
concurrent-consumers="8" task- 
executor="rabbit-executor" connection- 
factory="amqpConnectionFactoryInbound" 
message-converter="byteArrayToStringConverter" queue- 
names="${rabbit.queue}" acknowledge-mode="MANUAL" error- 
channel="errorChannelId" 
prefetch-count="25" /> 

<header-enricher input-channel="rabbitInboundMessageChannel" output- 
channel="rabbitOutboundboundMessageChannel"> 
<int:header name="Operation" value="${operation.rabbit}" /> 
<int:header name="GUID" expression="#{ 
'T(java.util.UUID).randomUUID().toString()' }" /> 
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" /> 
</header-enricher> 

<int:channel id="rabbitOutboundboundMessageChannel"> 
<int:interceptors> 
<int:wire-tap channel="loggerChannel" /> 
</int:interceptors> 
</int:channel> 

<task:executor id="rabbit-executor" rejection-policy="CALLER_RUNS" 
pool-size="10-30" 
queue-capacity="25" /> 
</beans:beans> 

從隊列然後,該消息被髮送到路由器信道: router.xml

<int:header-enricher input-channel="rabbitOutboundboundMessageChannel" 
output-channel="routerChannel"> 
<int:header name="Operation" value="${operation.router}" 
overwrite="true" /> 
<int:header name="file_name" expression="headers['GUID'] + '.xml'" /> 
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" 
overwrite="true" /> 
<int:error-channel ref="errorChannelId" /> 
</int:header-enricher> 

<int:recipient-list-router id="rabbitMsgrouter" input- 
channel="routerChannel"> 
<int:recipient channel="fileBackupChannel" selector-expression="new 
String(payload).length()>0" /> 
<int:recipient channel="transformerChannel" /> 
</int:recipient-list-router> 

<int:channel id="transformerChannel"> 
<int:interceptors> 
<int:wire-tap channel="loggerChannel" /> 
</int:interceptors> 
</int:channel> 
<int:channel id="fileBackupChannel"/> 
<int:channel id="loggerChannel"/> 
</beans> 

消息我現在發送到persister.xml和transformer.xml。以下是persister.xml,我想確認持久性是否成功。有transformer.xml

<int:header-enricher input-channel="fileBackupChannel" output- 
channel="fileSaveChannel"> 
<int:header name="Operation" value="${operation.filePersister}" 
overwrite="true" /> 
<int:header name="replyChannel" value="nullChannel" /> 
<int:header name="operationStartTime" expression="#{ 
'T(java.lang.System).currentTimeMillis()' }" /> 
<int:error-channel ref="errorChannelId" /> 
</int:header-enricher> 

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}" 
request-channel="fileSaveChannel" reply-channel="rabbitAckChannel"/> 

<int:service-activator input-channel="rabbitAckChannel" output- 
channel="nullChannel" ref="ackRabbit" method="handleRabbitAcks" /> 

<bean id="ackRabbit" 
class="com.expedia.dataloader.rabbit.RabbitAcknowledgement"/> 

<int:channel id="rabbitAckChannel"> 
<int:interceptors> 
<int:wire-tap channel="loggerChannel" /> 
</int:interceptors> 
</int:channel> 
<int:channel id="loggerChannel"/> 
<int:channel id="fileSaveChannel"/> 
</beans> 

我無法手動地從RabbitMQ的有效載荷ACKING後其他下游工藝。

這是我的工作流程:使用入站通道適配器從兔

1. Get消息:

<int-amqp:inbound-channel-adapter id="rabbitMQInboundChannelAdapter" 
channel="rabbitInboundMessageChannel" 
concurrent-consumers="${rabbit.concurrentConsumers}" task- 
executor="rabbit-executor" connection- 
factory="amqpConnectionFactoryInbound" 
message-converter="byteArrayToStringConverter" queue- 
names="${rabbit.queue}" acknowledge-mode="MANUAL" error- 
channel="errorChannelId" 
prefetch-count="${rabbit.prefetchCount}" /> 

2.使用出站網關堅持消息到磁盤:

<int-file:outbound-gateway id="fileBackUpChannelAdapter" 
directory="${file.location}" 
request-channel="fileSaveChannel" reply-channel="loggerChannel" /> 

3.來自兔子的時候persister(步驟2)s ucceeds。

對於步驟(3)中,i寫的以下代碼:

public class RabbitAcknowledgement { 
public void handleRabbitAcks(Message<?> message) throws IOException { 
com.rabbitmq.client.Channel channel = (Channel) 
message.getHeaders().get("amqp_channel"); 
long deliveryTag = (long) message.getHeaders().get("amqp_deliveryTag"); 
channel.basicAck(deliveryTag, false); 
} 

其中我從彈簧通過呼叫:

<int:service-activator input- 
channel="rabbitOutboundboundMessageChannel" output- 
channel="routerChannel" ref="ackRabbit" method="handleRabbitAcks" /> 

這不起作用和兔子我的隊列中的有效載荷未被收集。

我的問題是:

  1. 我需要手動ACK在這種情況下?
  2. 我在做什麼錯?
+0

什麼是證明不起作用的症狀? –

+0

有效載荷留在兔子身上,不被消費者消費。 –

+0

另外,在我目前的設置中,我相信每條線程都會收到1條消息。我可以以某種方式提高比率(即分批確認?)對於功能來說並不關鍵,但是想知道如果我以某種方式使系統更高效 –

回答

0

它應該工作正常;我只是跑了一個快速測試,它爲我的作品...

@SpringBootApplication 
public class So44666444Application implements CommandLineRunner { 

    public static void main(String[] args) { 
     SpringApplication.run(So44666444Application.class, args).close(); 
    } 

    @Autowired 
    private RabbitTemplate template; 

    private final CountDownLatch latch = new CountDownLatch(1); 

    @Override 
    public void run(String... args) throws Exception { 
     this.template.convertAndSend("foo", "bar"); 
     latch.await(); 
    } 

    @Bean 
    public AmqpInboundChannelAdapter adapter(ConnectionFactory cf) { 
     AmqpInboundChannelAdapter adapter = new AmqpInboundChannelAdapter(listenerContainer(cf)); 
     adapter.setOutputChannelName("ack"); 
     return adapter; 
    } 

    @Bean 
    public AbstractMessageListenerContainer listenerContainer(ConnectionFactory cf) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf); 
     container.setAcknowledgeMode(AcknowledgeMode.MANUAL); 
     container.setQueueNames("foo"); 
     return container; 
    } 

    @ServiceActivator(inputChannel = "ack") 
    public void ack(@Header(AmqpHeaders.CHANNEL) Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) Long tag) 
      throws IOException { 
     System.out.println("Acking: " + tag); 
     channel.basicAck(tag, false); 
     latch.countDown(); 
    } 

} 

如果我設置一個斷點上basicAck,我看到消息UNACKED在控制檯上;進入下一行,消息被刪除。

+0

Thanks @Gary Russell我在我的代碼中發現了一個錯誤,並讓它工作:將工作配置上載到原始文章。有什麼辦法可以讓這個效率更高? (速度更快,使用更少的資源等) –