2017-06-20 80 views
0

我試圖將Spring集成中的異步消息傳遞網關與RxNetty(異步HTTP)結合起來。基本上我想要的是向調用線程返回observable/CompletableFuture,並在調用線程中使用Observable的zip/map/flatmap來創建一批出站HTTP調用。我只是想看看這是否可能。此外,如果不是使用Rxjava構造,我最好使用聚合器eip來構建簡單的工作流程。Spring中的異步消息傳遞網關與RxNetty的集成

回答

1

由於4.1版的網關可以返回反應器2.0 Promise<?>

@MessagingGateway 
public static interface TestGateway { 

    @Gateway(requestChannel = "promiseChannel") 
    Promise<Integer> multiply(Integer value); 

} 

    ... 

@ServiceActivator(inputChannel = "promiseChannel") 
public Integer multiply(Integer value) { 
     return value * 2; 
} 

    ... 

Streams.defer(Arrays.asList("1", "2", "3", "4", "5")) 
      .get() 
      .map(Integer::parseInt) 
      .mapMany(integer -> testGateway.multiply(integer)) 
      .collect() 
      .consume(integers -> ...) 
      .flush(); 

與已更改爲反應堆3.1 Mono5.0版本開始。

我很確定有一些適配器可以將這些類型轉換爲RxJava的有價值的東西。

CompletableFuture<?>因爲4.2版本也支持網關:

CompletableFuture<String> process(String data); 

... 

CompletableFuture result = process("foo") 
    .thenApply(t -> t.toUpperCase()); 

... 

String out = result.get(10, TimeUnit.SECONDS); 

http://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#async-gateway

+0

感謝@artem。這有助於 – Harry

相關問題