2

我想使用Vert.x和RxJava壓縮觀察對象。我不知道我是否誤解了某些東西,或者這只是某種錯誤。這是代碼。Vert.x - RxJava - zip observables

public class BusVerticle extends Verticle { 

    public void start() { 

     final RxVertx rxVertx = new RxVertx(vertx); 

     Observable<RxMessage<JsonObject>> bus = rxVertx.eventBus().registerHandler("busName"); 

     Observable<RxHttpClientResponse> httpResponse = bus.mapMany(new Func1<RxMessage<JsonObject>, Observable<RxHttpClientResponse>>() { 
      public Observable<RxHttpClientResponse> call(RxMessage<JsonObject> rxMessage) { 
       RxHttpClient rxHttpClient = rxVertx.createHttpClient(); 
       rxHttpClient.coreHttpClient().setHost("localhost").setPort(80); 
       return rxHttpClient.getNow("/uri"); 
      } 
     }); 

     Observable<RxMessage<JsonObject>> zipObservable = Observable.zip(bus, httpResponse, new Func2<RxMessage<JsonObject>, RxHttpClientResponse, RxMessage<JsonObject>>() { 
      public RxMessage<JsonObject> call(RxMessage<JsonObject> rxMessage, RxHttpClientResponse rxHttpClientResponse) { 
       return rxMessage; 
      } 
     }); 

     zipObservable.subscribe(new Action1<RxMessage<JsonObject>>() { 
      public void call(RxMessage<JsonObject> rxMessage) { 
       rxMessage.reply(); 
      } 
     }); 
    } 
} 

我想使利用從所接收的消息信息的HTTP請求,然後壓縮兩個可觀測量,事件總線和HTTP響應,以便與從HTTP響應信息來回復消息。

我沒有收到我發送郵件的任何迴應。

在此先感謝!

回答

2

我已經用解決方法解決了它。某種混合解決方案。

public class BusVerticle extends Verticle { 

public void start() { 
    final RxVertx rxVertx = new RxVertx(vertx); 

    vertx.eventBus().registerHandler("busName", new Handler<Message<JsonObject>>() { 
     public void handle(final Message<JsonObject> message) { 
      RxHttpClient rxHttpClient = rxVertx.createHttpClient(); 
      rxHttpClient.coreHttpClient().setHost("localhost").setPort(80); 
      Observable<RxHttpClientResponse> httpRequest = rxHttpClient.getNow("/uri"); 
      httpRequest.subscribe(new Action1<RxHttpClientResponse>() { 
       public void call(RxHttpClientResponse response) { 
        container.logger().error(response.statusCode()); 
        message.reply(new JsonObject().putString("status", "ok")); 
       } 
      }); 
     } 
    }); 
} 

}