2016-06-10 36 views
1

我正在使用Spring Integration 4.2.5.RELEASE和Spring集成Java DSL 1.1.2.RELEASE。LambdaMessageProcessor無法識別ConversionService的有效負載類型

我無法使自定義轉換工作。我已經註冊自定義轉換器轉換成從byte[]my.object.MyClass

@Bean 
@IntegrationConverter 
public Converter bytesToMyClass() { 
    return new Converter<byte[], my.object.MyClass>() { 
     @Override 
     public my.object.MyClass convert(byte[] source) { 

      try { 
       return my.object.MyClass.newBuilder().mergeFrom(source).build(); 
      } catch (InvalidProtocolBufferException e) { 
       throw new RuntimeException("Could not convert message.", e); 
      } 
     } 
    }; 
} 

然後,我建立了我的集成流程。目標是根據名爲insertBuffer的變量將消息路由到兩條路徑中的一條。如果insertBuffer> 1,則聚合消息。否則,只需將單個消息包裝在一個集合中,並將其發送到服務方法。這裏是我的流程:

@Bean 
public IntegrationFlow routeInput(MessageChannel input, MyClassService service) { 

    return IntegrationFlows.from(input) 
      .<my.object.MyClass, Boolean>route((my.object.MyClass payload) -> insertBuffer > 1, mapping -> mapping 
       .subFlowMapping("true", aggregateflow -> aggregateflow 
        .<my.object.MyClass, Collection<my.object.MyClass>>aggregate(a -> a 
          .correlationStrategy(message -> 0) //all messages are part of the same group for now. 
          .releaseStrategy(group -> group.size() >= insertBuffer) 
          .sendPartialResultOnExpiry(true) 
          .expireGroupsUponCompletion(true) 
          .expireGroupsUponTimeout(true) 
          .groupTimeout(2000))) 
       .subFlowMapping("false", single -> single 
        .<my.object.MyClass, Collection<my.object.MyClass>>transform(Arrays::asList) 
       )) 
      .handle(Collection.class, (payload, headers) -> 
        service.saveResult(payload)) 
      .get(); 
} 

然而,當我嘗試運行此,我得到(以下全棧)java.lang.ClassCastException: [B cannot be cast to my.object.MyClass以下情況例外。

一些調試後,我注意到,當org.springframework.integration.dsl.LambdaMessageProcessor#processMessage試圖處理該消息,payloadTypejava.lang.Object時,我覺得應該是my.object.MyClass

看來我的所有仿製藥都是正確的,我錯過了什麼?

完整堆棧跟蹤:

ERROR o.s.i.handler.LoggingHandler - org.springframework.messaging.MessageHandlingException: nested exception is java.lang.ClassCastException: [B cannot be cast to my.object.MyClass 
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:130) 
    at org.springframework.integration.router.AbstractMessageProcessingRouter.getChannelKeys(AbstractMessageProcessingRouter.java:80) 
    at org.springframework.integration.router.AbstractMappingMessageRouter.determineTargetChannels(AbstractMappingMessageRouter.java:148) 
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:154) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147) 
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120) 
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442) 
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) 
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:231) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:154) 
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:102) 
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:105) 
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) 
    at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:103) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55) 
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) 
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51) 
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324) 
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) 
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) 
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: java.lang.ClassCastException: [B cannot be cast to my.object.MyClass 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:497) 
    at org.springframework.integration.dsl.LambdaMessageProcessor.processMessage(LambdaMessageProcessor.java:127) 
    ... 37 more 

回答

0

和λ的問題,它不能在運行時確定泛型類型:Java: how to resolve generic type of lambda parameter?

這就是爲什麼我們有沒有對此事進行了重載方法:

.<my.object.MyClass, Boolean>route(my.object.MyClass.class, payload -> insertBuffer > 1, mapping -> mapping 
+0

這讓我更進一步。謝謝。對聚合器有何建議?它似乎只是聚合'byte []',我希望它先做轉換,然後聚合。那可能嗎?我需要一個定製處理器來做到這一點嗎? – FGreg

+0

M-m-m。一個新的SO問題? –

+0

呃。我只是在聚集器前面加了一個'.transform(my.object.MyClass.class,m - > m)'。有點違反直覺,但足夠好。 – FGreg

相關問題