2017-06-12 98 views
0

我需要發佈一個JSON Camel Servlet,然後複製消息並拆分列表。處理每個拆分的消息並最終聚合並返回servlet響應。Apache camel servlet流split()後關閉()

但是,只要路由器中引入了分離器,就會看到輸入流正在關閉,並且Servlet無法將響應寫回。

請注意,流緩存也已啓用。

from("servlet:sample") 
       .log(LoggingLevel.INFO, "Message Received: JSON RQ") 
       .to("direct:samplejson"); 
from("direct:samplejson") 
    .streamCaching() 
    .setHeader("sampleId", simple("${id}")) 
    .log(LoggingLevel.INFO, SampleID in Header: ${header.sampleId}") 
    .unmarshal().json(JsonLibrary.Jackson, Sample.class) 
    .log(LoggingLevel.INFO, "Converted to JSON: ${body.toString}") 
    .process("sampleProcessor") 
    .split().body().streaming() 
     .choice() 
     .when().method("sampleProcessor", "isTypeA") 
      .marshal().jacksonxml(Sample.class) 
     .endChoice() 
     .otherwise() 
      .marshal().jacksonxml(Sample.class) 
    .end() 
    .aggregate(SampleAggregationStrategy).header("sampleId").completionSize(2).completionTimeout(1000L) 
    .to("direct:samplexml"); 
from("direct:samplexml").marshal().jacksonxml(List.class).log("FINISHED PROCESSING"); 

例外:

2017-06-12 12:22:25,311 [apr-8080-exec-2] INFO route1       - FINISHED PROCESSING 
2017-06-12 12:22:25,321 [apr-8080-exec-2] ERROR CamelHttpTransportServlet  - Error processing request 
java.io.IOException: Stream closed 
     at org.apache.catalina.connector.InputBuffer.read(InputBuffer.java:372) ~[catalina.jar:8.0.30] 
     at org.apache.catalina.connector.CoyoteInputStream.read(CoyoteInputStream.java:156) ~[catalina.jar:8.0.30] 
     at org.apache.camel.util.IOHelper.copy(IOHelper.java:196) ~[camel-core-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.DefaultHttpBinding.copyStream(DefaultHttpBinding.java:432) ~[camel-http-common-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.DefaultHttpBinding.doWriteDirectResponse(DefaultHttpBinding.java:496) ~[camel-http-common-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.DefaultHttpBinding.doWriteResponse(DefaultHttpBinding.java:395) ~[camel-http-common-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.DefaultHttpBinding.writeResponse(DefaultHttpBinding.java:322) ~[camel-http-common-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.CamelServlet.doService(CamelServlet.java:210) [camel-http-common-2.19.0.jar:2.19.0] 
     at org.apache.camel.http.common.CamelServlet.service(CamelServlet.java:74) [camel-http-common-2.19.0.jar:2.19.0] 
     at javax.servlet.http.HttpServlet.service(HttpServlet.java:729) [servlet-api.jar:?] 
     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:291) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) [catalina.jar:8.0.30] 
     at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) [tomcat-websocket.jar:8.0.30] 
     at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:239) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:212) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:106) [catalina.jar:8.0.30] 
     at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:141) [catalina.jar:8.0.30] 
     at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:79) [catalina.jar:8.0.30] 
     at org.apache.catalina.valves.AbstractAccessLogValve.invoke(AbstractAccessLogValve.java:616) [catalina.jar:8.0.30] 
     at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:88) [catalina.jar:8.0.30] 
     at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:521) [catalina.jar:8.0.30] 
     at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1096) [tomcat-coyote.jar:8.0.30] 
     at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:674) [tomcat-coyote.jar:8.0.30] 
     at org.apache.tomcat.util.net.AprEndpoint$SocketProcessor.doRun(AprEndpoint.java:2500) [tomcat-coyote.jar:8.0.30] 
     at org.apache.tomcat.util.net.AprEndpoint$SocketProcessor.run(AprEndpoint.java:2489) [tomcat-coyote.jar:8.0.30] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [?:1.8.0_74] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [?:1.8.0_74] 
     at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-util.jar:8.0.30] 
     at java.lang.Thread.run(Thread.java:745) [?:1.8.0_74] 

如果我刪除分離器和聚合器,我看在Servlet響應回來。

很多預先感謝您提供的所有幫助!

+0

你嘗試過香草拆分和聚合?你正在使用哪個版本? – Namphibian

+0

正在嘗試基於駱駝站點上描述的組合消息處理器模式實施我的解決方案。我嘗試了兩種分離器版本以及單獨的聚合器。在這兩種情況下,我都看到該流已關閉。我也調試了駱駝代碼。基本上,寫入servlet響應的DefaultHttpBinding obj接收與split()之前的步驟中的**「out」**交換。 – NateSound

回答

0

參見堆肥消息處理器EIP圖案(http://camel.apache.org/composed-message-processor.html)和只使用分離器,它允許你做一個叉/在相同的工作單元連接,並且能夠返回在servlet的響應的例子。 分配器只有支持AggregationStrategy您可以配置您在哪裏彙總響應。換句話說,不要使用分離器和聚合器,而只使用分離器。

+0

正在嘗試基於駱駝站點上描述的組合消息處理器模式實施我的解決方案。我嘗試了兩種分離器版本以及單獨的聚合器。在這兩種情況下,我都看到該流已關閉。我也調試了駱駝代碼。基本上,寫入servlet響應的DefaultHttpBinding obj接收與split()之前的步驟中的**「out」**交換。 – NateSound