2016-08-05 68 views
3

此問題與this one有關,其中我詢問了如何從反應式彈簧控制器流式傳輸數據。Spring 5 Web反應式編程 - WebClient ClassCastException從Spring反應式控制器解編流式數據時的JSON

正如Rossen指出的,我們必須使用text/event-stream將流結果作爲服務器發送的事件發送回來,到目前爲止情況非常好。

我有一個這樣的服務:

@GetMapping(value="/accounts/alertsStreaming", headers="accept=text/event-stream") 
public Flux<Alert> getAccountAlertsStreaming() { 
    return Flux.fromArray(new Alert[]{new Alert((long)1, "Alert message"), 
             new Alert((long)2, "Alert message2"), 
             new Alert((long)3, "Alert message3")}) 
       .delayMillis(1000) 
       .log(); 
} 

從瀏覽器中調用它的3個結果開始與1秒的延遲被接收。

我想打電話從一個Web客戶端此服務,並實現了它這樣:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(Alert.class)); 
     return response; 
    }  
} 

這是測試代碼:

@Test 
@ContextConfiguration(classes={WebClientConfig.class, AccountsServiceClient.class}) 
public class AccountsServiceClientTest extends AbstractTestNGSpringContextTests{ 

    private Logger logger = LoggerFactory.getLogger(getClass()); 

    @Autowired 
    private AccountsServiceClient client; 

    public void testNumbersServiceClientStreamingTest() throws InterruptedException{ 

     CountDownLatch latch = new CountDownLatch(1); 

     Flux<Alert> alerts = client.getAccountAlertsStreaming("http://localhost:8080"); 
     alerts.doOnComplete(() -> { 
      latch.countDown(); 
     }).subscribe((n) -> { 
      logger.info("------------> GOT ALERT {}", n); 
     }); 

     latch.await(); 
    } 
} 

的問題是,客戶端嘗試時提取結果時,HttpMessageReader' s都不能讀取text/event-stream + Alert.class

public class ResponseExtractors { 

    protected static HttpMessageReader<?> resolveMessageReader(List<HttpMessageReader<?>> messageReaders, 
       ResolvableType responseType, MediaType contentType) { 

      return messageReaders.stream() 
        .filter(e -> e.canRead(responseType, contentType)) 
        .findFirst() 
        .orElseThrow(() -> 
          new WebClientException(
            "Could not decode response body of type '" + contentType 
              + "' with target type '" + respons 

eType.toString() + "'")); 
    } 

例外:

reactor.core.Exceptions$BubblingException: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at reactor.core.Exceptions.bubble(Exceptions.java:97) 
    at reactor.core.Exceptions.onErrorDropped(Exceptions.java:263) 
    at reactor.core.publisher.LambdaSubscriber.onError(LambdaSubscriber.java:126) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onError(FluxPeek.java:183) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:128) 
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:169) 
    at reactor.core.publisher.FluxLog$LoggerSubscriber.doNext(FluxLog.java:161) 
    at reactor.core.publisher.OperatorAdapter.onNext(OperatorAdapter.java:88) 
    at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:123) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onNext(FluxResume.java:75) 
    at reactor.core.publisher.FluxJust$WeakScalarSubscription.request(FluxJust.java:103) 
    at reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1010) 
    at reactor.core.publisher.FluxResume$ResumeSubscriber.onSubscribe(FluxResume.java:70) 
    at reactor.core.publisher.FluxJust.subscribe(FluxJust.java:71) 
    at reactor.ipc.netty.http.NettyHttpClientHandler.channelRead(NettyHttpClientHandler.java:120) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:435) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:293) 
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:280) 
    at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:396) 
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:248) 
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:250) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:233) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:350) 
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1334) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:372) 
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:358) 
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:926) 
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:123) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:571) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:512) 
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:426) 
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:398) 
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:877) 
    at java.lang.Thread.run(Thread.java:745) 
Caused by: org.springframework.web.client.reactive.WebClientException: Could not decode response body of type 'text/event-stream' with target type 'com.codependent.spring5.playground.reactive.dto.Alert' 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$resolveMessageReader$23(ResponseExtractors.java:203) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$61/1950155746.get(Unknown Source) 
    at java.util.Optional.orElseThrow(Optional.java:290) 
    at org.springframework.web.client.reactive.ResponseExtractors.resolveMessageReader(ResponseExtractors.java:200) 
    at org.springframework.web.client.reactive.ResponseExtractors.decodeResponseBody(ResponseExtractors.java:181) 
    at org.springframework.web.client.reactive.ResponseExtractors.lambda$null$12(ResponseExtractors.java:89) 
    at org.springframework.web.client.reactive.ResponseExtractors$$Lambda$36/70386506.apply(Unknown Source) 
    at reactor.core.publisher.MonoFlatMap$FlattenSubscriber.onNext(MonoFlatMap.java:126) 
    ... 37 common frames omitted 
+0

從服務器的角度來看,WebClient和Web瀏覽器有什麼不同? –

+0

我明白了你的觀點,但除了新的語義之外,如果我們不能得到流式結果,但所有這些都在同一時間,那麼使用新的'WebClient'而不是舊的'RestTemplate'有什麼意義呢? ? – codependent

回答

0

也許這應該是自動由框架處理。在任何情況下,我解決它解組自己的JSON數據流:

WebConfigClient:

@Configuration 
public class WebClientConfig { 

    @Bean 
    public ObjectMapper jacksonObjectMapper(){ 
     return new ObjectMapper(); 
    } 

    @Bean 
    public WebClient webClient(){ 
     WebClient webClient = new WebClient(new ReactorClientHttpConnector()); 
     return webClient; 
    } 

} 

服務客戶:

@Component 
public class AccountsServiceClient { 

    @Autowired 
    private WebClient webClient; 

    @Autowired 
    private ObjectMapper jacksonObjectMapper; 

    public Flux<Alert> getAccountAlertsStreaming(String serviceBaseUrl){ 
     Flux<Alert> response = webClient 
       .perform(get(serviceBaseUrl+"/accounts/alertsStreaming").header("Accept", "text/event-stream")) 
       .extract(bodyStream(String.class)) 
       .map((e -> { 
        try { 
         e = e.substring(e.indexOf(":")+1); 
         Alert a = jacksonObjectMapper.readValue(e, Alert.class); 
         return a; 
        } catch (Exception e1) { 
         e1.printStackTrace(); 
         return null; 
        } 

       })); 
     return response; 
    } 

} 

UPDATE:在Spring 5 M4的這是由框架完成的。您可以在這裏查看使用新API的解決方案:Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?

0

這已經是個問題了。請評論/投票給SPR-14539

相關問題