2017-07-24 72 views
1

我是Spring集成的新手,對如何將錯誤消息發送到指定的錯誤隊列感到困惑。我想將錯誤消息作爲原始消息的標題,並最終放在單獨的隊列中。我讀過這可以用一個頭文件richher來完成,我試圖實現但沒有任何內容出現在錯誤隊列中。在處理Spring集成中的異常時遇到問題

此外,我是否需要一個單獨的異常處理類爲了使錯誤消息使它到錯誤隊列或我可以只在我的轉換方法中引發異常?

這裏是我的xml配置:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans  
         http://www.springframework.org/schema/beans/spring-beans.xsd  
         http://www.springframework.org/schema/integration  
         http://www.springframework.org/schema/integration/spring-integration.xsd 
         http://www.springframework.org/schema/integration/amqp 
         http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
         http://www.springframework.org/schema/rabbit 
         http://www.springframework.org/schema/context 
         http://www.springframework.org/schema/context/spring-context.xsd 
         http://www.springframework.org/schema/rabbit/spring-rabbit.xsd"> 

    <rabbit:connection-factory id="connectionFactory" host="bigdata-rdp" username="myuser" password="mypass" /> 
    <rabbit:template id="amqpTemplate" connection-factory="connectionFactory" /> 
    <rabbit:admin connection-factory="connectionFactory" /> 
    <rabbit:queue name="first" auto-delete="false" durable="true" /> 
    <rabbit:queue name="second" auto-delete="false" durable="true" /> 
    <rabbit:queue name="errorQueue" auto-delete="false" durable="true" /> 

    <int:poller default="true" fixed-rate="100"/> 

    <rabbit:fanout-exchange name="second-exchange" auto-delete="true" durable="true"> 
    <rabbit:bindings> 
     <rabbit:binding queue="second" /> 
    </rabbit:bindings> 
    </rabbit:fanout-exchange> 

    <rabbit:fanout-exchange name="error-exchange" auto-delete="true" durable="true"> 
     <rabbit:bindings> 
     <rabbit:binding queue="errorQueue" /> 
     </rabbit:bindings> 
    </rabbit:fanout-exchange> 

    <int-amqp:outbound-channel-adapter channel="messageOutputChannel" exchange-name="second-exchange" amqp-template="amqpTemplate" /> 

    <int-amqp:inbound-channel-adapter channel="messageInputChannel" error-channel="errorInputChannel" queue-names="first" connection-factory="connectionFactory" concurrent-consumers="20" /> 

    <int-amqp:outbound-channel-adapter channel="errorOutputChannel" exchange-name="error-exchange" amqp-template="amqpTemplate" /> 

    <int:channel id="messageInputChannel" /> 
    <int:channel id="messageOutputChannel"/> 
    <int:channel id="errorInputChannel"/> 

<int:service-activator input-channel="errorInputChannel" output-channel= "errorOutputChannel" method = "handleError" > 
    <bean class="firstAttempt.MessageErrorHandler"/> 

<int:chain input-channel="messageInputChannel" output-channel="messageOutputChannel"> 
    <int:header-enricher> 
    <int:error-channel ref="errorInputChannel" /> 
     </int:header-enricher> 
      <int:transformer method = "convert" > 
       <bean class="firstAttempt.JsonObjectConverter" /> 
      </int:transformer> 
     <int:service-activator method="transform"> 
      <bean class="firstAttempt.Transformer" /> 
     </int:service-activator> 
    <int:object-to-string-transformer /> 
    </int:chain> 

</beans> 

錯誤類別:

public class ErrorHandler { 
    public String errorHandle(MessageHandlingException exception) { 
     return exception.getMessage(); 

QualityScorer類(由變壓器稱爲):

public class QualityScorer { 
    private Hashtable<String, String> table; 
    private final static String csvFile = "C:\\Users\\john\\Test.csv"; 

public QualityScorer() throws Exception { 
    table = new Hashtable<String, String>(); 
    initializeTable(); 
} 

private void initializeTable() throws Exception { 
    BufferedReader br = null; 
     String line = ""; 
    String cvsSplitBy = ","; 
    try { 
     br = new BufferedReader(new FileReader(csvFile)); 
     while ((line = br.readLine()) != null) { 
       String[] data = line.split(cvsSplitBy); 
      if(data.length > 6 && data[1].equals("1") && data[4].equals("0") && data[5].equals("1")) 
       table.putIfAbsent(data[3], data[1]); 
     } 
    } catch (FileNotFoundException e) { 
     throw new Exception("No file found"); 
    } catch (IOException e) { 
      e.printStackTrace(); 
    } finally { 
     if (br != null) { 
       try { 
        br.close(); 
       } catch (IOException e) { 
        e.printStackTrace(); 
       } 
      } 
     } 
    } 

    public float getScore(JSONObject object) throws Exception { 
     float score; 
     if (object == null) { 
      throw new IllegalArgumentException("object"); 
     } 
     if (!object.has("source")) { 
      throw new Exception("Object does not have a source"); 
     } 
     if (!object.has("employer")) { 
      throw new Exception("Object does not have an employer"); 
     } 
     String source = object.getString("Source"); 
     String employer = object.getString("employer"); 
      if (table.containsKey(employer) && !source.equals("packageOne")) { 
       score = 1; 
      } else { 
       score = -1; 
     } 
     return score; 
    } 
} 

眼下,裝載的消息沒有源,所以程序應該投擲MessagingException到MessageErrorHandler。

變壓器代碼:

public class Transformer { 
private QualityScorer qualityScorer; 

public Transformer() throws Exception { 
    qualityScorer = new QualityScorer(); 
} 

public JSONObject transform(JSONObject object) throws Exception { 

    float score = qualityScorer.getScore(object); 
    object.put("score", score); 
    return object; 
    } 
} 

總之,程序應該從一個隊列接收一個預加載的消息,轉換它,並將其發送到一個第二隊列,如果提供源,它成功地確實在預先加載的消息中。我試圖處理錯誤,並將其作爲消息頭髮送到錯誤隊列。這個問題一直讓我沮喪,所以非常感謝幫助!

在堆棧跟蹤目前正在顯示的錯誤是:

java.lang.NoSuchMethodError: org.springframework.messaging.MessageHandlingException: method <init>(Lorg/springframework/messaging/Message;Ljava/lang/Throwable;)V not found 
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:96) 
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:89) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.handler.MessageHandlerChain$1.send(MessageHandlerChain.java:129) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:358) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:269) 
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:186) 
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.handler.MessageHandlerChain.handleMessageInternal(MessageHandlerChain.java:110) 
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) 
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) 
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:423) 
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:114) 
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:44) 
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:92) 
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:188) 
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1100(AmqpInboundChannelAdapter.java:56) 
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.processMessage(AmqpInboundChannelAdapter.java:246) 
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:203) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:822) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:745) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:97) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:189) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1276) 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:726) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1219) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1189) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1500(SimpleMessageListenerContainer.java:97) 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1421) 
at java.lang.Thread.run(Thread.java:748) 

但沒有什麼會錯誤隊列。

回答

1

當拋出異常時,它會與requestMessage一起打包到MessagingException。您自己的業務例外情況位於cause,您可以從MessagingException.failedMessage屬性訪問requestMessage

所以,它看起來像你有你需要的一切用例。 只有在發送到error-exchange之前,您在錯誤流中確實應該有一些<transformer>才能將MessagingException正確轉換爲發送給AMQP的正確消息。

+0

嗨Artem,謝謝你的迴應!我還是很新的,我只是更新了一個錯誤類和一個鏈來調用錯誤類中的方法,但是我仍然沒有看到任何顯示在錯誤隊列中的東西。 – arsenal11

+1

'input-channel =「errorChannel」output-channel =「errorChannel」'???這是錯誤的配置。你製作ca StackOverflow –

+1

'errorHandle(JSONObject對象)'。這是錯誤的。你必須期待'MessagingException':http://docs.spring.io/spring-integration/reference/html/configuration.html#namespace-errorhandler –