2017-02-27 208 views
0

這接聽Flux是一個後續問題Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?在春季啓動客戶

我試圖按照推薦瞭如何使用Spring的Web客戶端接收流量,但實際上得到了網狀的問題。

在服務器端的代碼是一個簡單的控制器暴露蒙戈庫的的findAll方法:

@RequestMapping("power") 
public Flux<Power> getAll() { 
    return powerRepository.findAll(); 
} 

在客戶消耗的代碼是這樣在上面給出的答案:

@Bean 
public CommandLineRunner readFromApiServer() { 
    return new CommandLineRunner() { 
     @Override 
     public void run(String... args) throws Exception { 
      WebClient webClient = WebClient.create("http://localhost:8080"); 
      Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange() 
        .flatMap(response -> response.bodyToFlux(Power.class)); 
      flux.subscribe(); 
     } 
    }; 
} 

但這拋出異常:

2017年2月27日08:19:41.281 ERROR 99026 --- [構造函數-HTTP-NIO-5] r.ipc.netty.channel.ChannelOperations:[HttpClient]錯誤 處理連接。請求關閉通道

io.netty.util.IllegalReferenceCountException:refCnt:0,遞減:1 在 io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101) 〜[網狀-所有-4.1 .8.Final.jar:4.1.8.Final]

我正在使用當前的Spring Boot 2.0.0 BUILD-SNAPSHOT。

這個異常告訴我什麼?我怎樣才能解決它?

回答

1

所有CommandLineRunner bean在Spring Boot應用程序啓動時執行。如果沒有守護進程線程(即當前應用程序不是Web應用程序),那麼一旦所有跑步者執行完畢,應用程序將關閉。

在你的情況下,只使用flux.subscribe()「啓動鏈並請求無界需求」(javadoc),所以這個方法調用沒有被阻塞。我懷疑這個命令行跑步者在你有機會做任何事情之前都會返回,應用程序關閉並且你的網絡資源被關閉。

另外,你沒有對HTTP請求的結果做任何事情。 我覺得跟更新您的代碼示例下面要解決的問題:

@Bean 
public CommandLineRunner readFromApiServer() { 
    return new CommandLineRunner() { 
     @Override 
     public void run(String... args) throws Exception { 
      WebClient webClient = WebClient.create("http://localhost:8080"); 
      Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange() 
        .flatMap(response -> response.bodyToFlux(Power.class)); 
      List<Power> powerList = flux.collectList().block(); 
      // do something with the result? 
     } 
    }; 
} 
+0

其實我有在主程序中無限循環,所以它會阻止應用程序從關停沒有守護線程。但它看起來像消費者在其自己的線程中運行,並且如果不被阻塞則立即返回。 而不是收集列表中的結果我消耗「權力」,因爲他們到達:flux.doOnNext(power - > System.out.println(power.getValue()))。blockLast(); 這工作正常。感謝您的支持! – Gregor