2017-03-29 65 views
1

我寫下面提到的方法來獲取批量從couchbase服務器的數據。如何使用RxJava和couchbase排序?

bucket.async() 
      .query(N1qlQuery.simple(query)) 
      .doOnNext(res -> res.info().map(N1qlMetrics::elapsedTime).forEach(t -> System.out.println(t))) 
      .flatMap(AsyncN1qlQueryResult::rows) 
      .flatMap(row -> 
      bucket.async(). 
      get(row.value().getString("id"))) 
      .map(JsonDocument::content). 
      toList() 
      .toBlocking() 
      .single(); 

此代碼工作正常,當我通過查詢

"SELECT meta().id as id FROM bucket" 

但是當我使用類似

"SELECT meta().id as id FROM bucket order by id ASC" 

結果我收到不排序。但是,當我在查詢控制檯上運行相同的查詢控制檯結果是預期的。這讓我相信我在rxJava中做錯了什麼。請幫我解決這個問題。

回答

5

順序丟失,因爲flatMap()操作,適用併發流,但不維持秩序。
當你申請flatMap()要創建和訂閱新Observable每個onNext(),意爲每一行,你在並行這一行執行:

bucket.async(). 
     get(row.value().getString("id"))) 

然後每個get操作將在完成不同的時間,並且獲取的內容將被無序排列。

如果要維護順序但不在意丟失並行性,則應使用僅保留1個活動流的concatMap(),並按順序預訂每個獲取操作。

如果您確實需要/想要並行化,您應該使用concatMapEager(),它將並行執行每個創建的Observable,但會按順序發出這些項目。

+2

我同意這一點,concatMapEager是去我的思維方式。我們已經做到這一點的看法與「包括文檔」,看到https://github.com/couchbase/couchbase-java-client/blob/master/src/main/java/com/couchbase/client/java/view/ViewQueryResponseMapper .java#L220如果你對細節感興趣。 – daschl