2017-02-10 63 views
0

我使用rxjava來使用Observable.zip並行處理兩個請求。我試圖做的是,在一個observable say response我得到一個響應,並在其他observable say diff我想獲得響應,並保存在數據庫中的差異。問題是我不知道如何實現我的要求作爲diff observable是沒有得到完成,如果response observable獲得響應rxjava - 獲取響應並且並行插入差異

下面是我在做什麼...

public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){ 
    Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable(); 
    Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId); 

    Observable<ServiceResponse> responseObservable = Observable.zip(
      subInfoDummyObservable, 
      reObservable, 
      new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() { 
       @Override 
       public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) { 
        return serviceResponse; 
       } 
      } 
    ); 

    ServiceResponse serviceResponse = responseObservable.toBlocking().single(); 

    return serviceResponse; 
} 

Observable<ServiceResponse> getDummyResonseGenericObservable() { 
    return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable",() -> new ServiceResponse(),(t) -> {return null;}); 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable",() -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;}); 
} 

public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    Observable<ServicesDiff> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); // never reaches this point********** 
       } 
      } 
    ); 
    ServicesDiff response = observable.toBlocking().single(); 

    return response; 
} 

我插入diff來DB在aggregate方法,但它永遠不會達到aggregate。請讓我知道我在這裏做錯了什麼?謝謝。

+0

你的代碼示例是我不明白,是什麼問題所在的getBothServiceResponses方法與代碼的其餘部分之間的關​​系? 你在getBothServiceResponses上壓縮的兩個觀察對象是什麼? – yosriz

+0

我同意,代碼需要澄清。前三種方法永遠不會被調用,所以我們不知道第四個方法'getBothServiceResponses()'中的觀察值是什麼樣子的,問題出在哪裏 – nosyjoe

回答

0

Observable是如何使用數據的說明。在您的代碼示例中,您不需要訂閱,也不會實際使用數據。您剛剛描述瞭如何請求,但是訂閱部分(即觸發請求的部分)缺失。

所以,如果我重寫了一點代碼:

class Aggregate { 
    Aggregate(String reponse, ServicesDiff diff) { 
     ... 
    } 
} 

Observable<String> getService1GenericObservable(String prodId) { 
    ... 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    ... 
} 

public Observable<Aggregate> getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    return Observable<Aggregate> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); 
       } 
      } 
    ); 
} 

你將不得不這樣做是爲了訪問結果彙總:

getBothServiceResponses(serviceRequest, prodId).subscribe(...)