2017-02-25 57 views
1

我需要在哪裏偵聽來自名爲Metadata的隊列的消息 - 然後基於該消息,我將不得不讀取一些隊列讓它調用dataQ(該隊列的名稱將在元數據消息中) 。爲了閱讀元數據,我可以使用兔子監聽器,但之後我必須從dataQ讀取其他消息,以便我可以手動進行拉動 - 但我希望有一些更像兔子監聽器那樣更乾淨,這樣我就不必管理頻道,ack等等。但是,直到我們從元數據隊列讀取消息之前,我們才知道隊列名稱,試圖探索其他解決方案。這個dataQ可以是1000個不同的隊列名稱,所以我們必須動態地監聽dataQ。使用彈簧抽象的Rabbit MQ設計

此外,ack應該像這樣工作 - 從元數據隊列中讀取消息,給定數據的過程Q - 發送數據消息中的消息a(dataQ可能有多條消息)併發送元數據隊列的ack。

(如果這個工程很好的單個消費者,然後我可以從元數據隊列超過一個消息添加容器模型和流程這意味着我將能夠同時處理多個數據隊列)。


更新的建議,感到困惑如何在主監聽獲得的事件和怎麼樣該標誌將併發工作(對不起,不使用應用程序事件廣泛到目前爲止)

package com.example; 

import org.springframework.amqp.rabbit.annotation.RabbitListener; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 
import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.context.ApplicationEvent; 
import org.springframework.context.ApplicationEventPublisher; 
import org.springframework.context.annotation.Configuration; 
import org.springframework.context.event.EventListener; 

@Configuration 
public class MyListener { 

    @Autowired 
    ConnectionFactory connectionFactory; 

    @Autowired 
    private ApplicationEventPublisher applicationEventPublisher; 

    @RabbitListener(queues = "Metadata") 
    public void messageProcessing(String c) { 
     System.out.println(c); 

     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
     container.setConnectionFactory(connectionFactory); 
     container.setQueueNames(c); 
     container.setMessageListener(new MessageListenerAdapter(new DataHandler())); 
     container.setApplicationEventPublisher(applicationEventPublisher); 
     container.setIdleEventInterval(5000); 
     container.start(); 

     // how to get container idle event here 

     // so we can call container.stop(); 

    } 

    public class DataHandler { 
     public void handleMessage(byte[] text) { 
      System.out.println("Data Received: " + text); 
     } 
    } 

    @EventListener 
    public void onApplicationEvent(ApplicationEvent event) { 
     //I am getting idle event here 
     System.out.println(event.getSource()); 
    } 

} 

回答

1

這將是很容易啓動一個新的元中的SimpleMessageListenerContainer數據監聽器,處理數據;但是你不能從不同的監聽者那裏得到原始的消息。

您必須保留元數據線程,直到輔助偵聽器完成,然後釋放元數據線程,以便它可以查看原始郵件。 您可以使用容器空閒事件來檢測工作是否完成(除非您有其他機制來了解所有工作已完成)。

在元數據偵聽器容器上設置併發性,以確定要以這種方式並行處理的數量。

@RabbitListener(queues = "meta") 
public void handle(SomeObject message) { 

    // extract dataQ 

    // create a new SimpleMessageListenerContainer 
    // Inject an ApplicationEventPublisher instance 
    // start the container 

    // block here, waiting for a container idle event 
    // stop the container 

    return; 
} 

請記住,雖然,如果服務器崩潰,元數據消息將被重新傳遞(默認),你可能已經處理過的一些數據信息。

編輯

關於下面的評論,我的意思是使用自己的發佈者,所以你不必弄清楚事件來自哪個容器......

@SpringBootApplication 
public class So42459257Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So42459257Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend("meta", "foo"); 
     template.convertAndSend("foo", "baz"); 
     template.convertAndSend("foo", "baz"); 
     template.convertAndSend("meta", "bar"); 
     template.convertAndSend("bar", "qux"); 
     template.convertAndSend("bar", "qux"); 
     context.getBean(So42459257Application.class).testCompleteLatch.await(10, TimeUnit.SECONDS); 
     context.close(); 
    } 

    private final CountDownLatch testCompleteLatch = new CountDownLatch(2); 

    @Autowired 
    private ConnectionFactory connectionFactory; 

    @RabbitListener(queues = "meta") 
    public void handleMeta(final String queue) throws Exception { 
     System.out.println("Started processing " + queue); 
     final CountDownLatch startedLatch = new CountDownLatch(1); 
     final CountDownLatch finishedLatch = new CountDownLatch(1); 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(this.connectionFactory); 
     container.setQueueNames(queue); 
     container.setMessageListener(new MessageListenerAdapter(new Object() { 

      @SuppressWarnings("unused") 
      public void handleMessage(String in) { 
       startedLatch.countDown(); 
       System.out.println("Received " + in + " from " + queue); 
      } 

     })); 
     container.setIdleEventInterval(5000); 
     container.setApplicationEventPublisher(new ApplicationEventPublisher() { 

       @Override 
       public void publishEvent(Object event) { 
       } 

       @Override 
       public void publishEvent(ApplicationEvent event) { 
        if (event instanceof ListenerContainerIdleEvent) { 
         finishedLatch.countDown(); 
       } 

      }; 
     }); 
     container.afterPropertiesSet(); 
     container.start(); 
     if (startedLatch.await(60, TimeUnit.SECONDS)) { 
      // handle container didn't receive any messages 
     } 
     if (finishedLatch.await(60, TimeUnit.SECONDS)) { 
      // handle container didn't go idle 
     } 
     System.out.println("Finished processing " + queue); 
     container.stop(); 
     this.testCompleteLatch.countDown(); 
    } 

    @Bean 
    public Queue meta() { 
     return new Queue("meta", false, false, true); 
    } 

    @Bean 
    public Queue foo() { 
     return new Queue("foo", false, false, true); 
    } 

    @Bean 
    public Queue bar() { 
     return new Queue("bar", false, false, true); 
    } 

} 

隨着聽衆容器併發性設置爲2(使用彈簧引導簡單地添加

spring.rabbitmq.listener.concurrency=2 

to application properties);如果您不使用引導,請自行配置工廠。

結果:

Started processing bar 
Started processing foo 
Received baz from foo 
Received qux from bar 
Received baz from foo 
Received qux from bar 
Finished processing bar 
Finished processing foo 
+0

感謝加里,我確實如你所說,其工作如你所說。我更新了後續問題的問題。 – user3444718

+0

我的意思是使用你自己的發佈者,所以你不必弄清楚事件來自哪個容器。看到我的答案編輯。 –

+0

這太棒了,現在我也可以用來處理來自數據q的100條消息,並將消息放回到元數據q中,因此我不會繼續處理一個dataq。我會很快嘗試這個版本。太棒了。 – user3444718