2015-05-13 87 views
30

我在理解RxJava中的zip運算符以解決我的android項目時遇到了很多麻煩。 問題 我需要能夠發送網絡請求上傳視頻 然後,我需要發送一個網絡請求上傳圖片去與它 最後,我需要添加一個描述,並使用前兩個請求將視頻和圖片的位置URL以及描述上傳到我的服務器。Rxjava Android如何使用Zip運算符

我認爲zip操作符對於這項任務來說是完美的,因爲我知道我們可以接受兩個可觀察對象(視頻和圖片請求)的響應,並將它們用於我的最終任務。 但我似乎無法得到這個發生,我想象它。

我正在尋找某人回答如何通過一些僞代碼在概念上做到這一點。 謝謝

回答

40

Zip運算符嚴格將可觀察對象中的發射項與對象配對。它等待兩個(或更多)物品到達然後合併它們。所以是的,這將適合您的需求。

我會用Func2來鏈接前兩個觀測值的結果。 注意如果你使用Retrofit,這個方法會更簡單,因爲它的api接口可能會返回一個observable。否則,您將需要創建自己的可觀察值。

// assuming each observable returns response in the form of String 
Observable<String> movOb = Observable.create(...); 
// if you use Retrofit 
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...), 
Observable.zip(movOb, picOb, 

    new Func2<String, String, MyResult>() { 

     @Override 
     public MyResult call(String movieUploadResponse, 
      String picUploadResponse) { 
      // analyze both responses, upload them to another server 
      // and return this method with a MyResult type 
      return myResult; 
     } 
     } 
) 
// continue chaining this observable with subscriber 
// or use it for something else 
+0

我一直在遇到的問題是,我有一種觀點,即我解除了視頻觀察視圖的任務,另一視圖觀察到了另一個視圖,而另一個視圖應該同時採用這些結果並將它們用於最終的可觀察視圖。 。zip會將已經執行的可觀察結果返回給我嗎? – feilong

+0

我不認爲你可以執行observable,但我想我理解你的觀點。只有在附屬於附屬服務器的情況下,Observable纔會運行。所以整個可觀察的安排必須在你附加用戶之前完成。 – inmyth

+1

我同意你的觀點,即「zip運算符允許你從兩個不同的可觀察結果組成結果」。現在我想要observable 2依賴於observable 1,所以observable 1應該在observable 2之前執行,然後我需要結合兩個observable的結果。我們是否有這方面的運營商?拉鍊可以做這份工作。我不想使用flatMap,因爲它將一個流轉換爲另一個流,但在這裏我需要設置依賴關係,然後壓縮結果。請回復。 –

0

zip運算符允許您從兩個不同的可觀察結果中組合結果。

您必須給出lambda表達式,該lambda表達式將創建每個observable發出的數據的結果。

Observable<MovieResponse> movies = ... 
Observable<PictureResponse> picture = ... 

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> { 
     return new Response("description", movie.getName(), pic.getUrl()); 

}); 
+0

我同意你的觀點:「zip運算符允許你從兩個不同的可觀察結果組成結果」。現在我想要observable 2依賴於observable 1,所以observable 1應該在observable 2之前執行,然後我需要結合兩個observable的結果。我們是否有這方面的運營商?拉鍊可以做這份工作。我不想使用flatMap,因爲它將一個流轉換爲另一個流,但在這裏我需要設置依賴關係,然後壓縮結果。請回復。 –

39

分步: - >讓我們先來定義我們的改造對象來訪問GitHub的API,然後創建了兩個觀測對上述兩種網絡請求:

Retrofit repo = new Retrofit.Builder() 
     .baseUrl("https://api.github.com") 
     .addConverterFactory(GsonConverterFactory.create()) 
     .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
     .build(); 

Observable<JsonObject> userObservable = repo 
     .create(GitHubUser.class) 
     .getUser(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

Observable<JsonArray> eventsObservable = repo 
     .create(GitHubEvents.class) 
     .listEvents(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

改造界面很簡單足夠:

public interface GitHubUser { 
    @GET("users/{user}") 
    Observable<JsonObject> getUser(@Path("user") String user); 
} 

public interface GitHubEvents { 
    @GET("users/{user}/events") 
    Observable<JsonArray> listEvents(@Path("user") String user); 
} 

最近,我們使用RxJava的郵遞方法向我們兩個觀測量結合起來,等待他們創造一個新的可觀測之前完成。

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() { 
    @Override 
    public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) { 
    return new UserAndEvents(jsonObject, jsonElements); 
    } 
}); 

有什麼UserAndEvents?這只是一個簡單的POJO兩個對象組合:

public class UserAndEvents { 
    public UserAndEvents(JsonObject user, JsonArray events) { 
    this.events = events; 
    this.user = user; 
    } 

    public JsonArray events; 
    public JsonObject user; 
} 

最後,讓我們呼籲訂閱方法我們新的組合可觀察到:

combined.subscribe(new Subscriber<UserAndEvents>() { 
      ... 
      @Override 
      public void onNext(UserAndEvents o) { 
      // You can access the results of the 
      // two observabes via the POJO now 
      } 
     }); 
+0

有沒有任何可能,身體params取代例如身體請求作爲第二個請求的身體參數發送的第一個請求的參數。這似乎是棘手的我的拉鍊電話@ramkailash – Killer

2

這裏我有,我沒有使用Zip爲例在異步方式,以防萬一you're好奇

 /** 
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. 
* By default Rx is not async, only if you explicitly use subscribeOn. 
    */ 
@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("Async in:", start, result)); 
} 

/** 
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline 
*/ 
@Test 
public void testZip() { 
    long start = System.currentTimeMillis(); 
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) 
                     .concat(s3)) 
       .subscribe(result -> showResult("Sync in:", start, result)); 
} 


public void showResult(String transactionType, long start, String result) { 
    System.out.println(result + " " + 
           transactionType + String.valueOf(System.currentTimeMillis() - start)); 
} 

public Observable<String> obString() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obString1() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obString2() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

public Observable<String> obAsyncString() { 
    return Observable.just("") 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obAsyncString1() { 
    return Observable.just("") 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obAsyncString2() { 
    return Observable.just("") 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

您可以在這裏看到更多的例子https://github.com/politrons/reactive

6

example

Observable<String> stringObservable1 = Observable.just("Hello", "World"); 
Observable<String> stringObservable2 = Observable.just("Bye", "Friends"); 

Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() { 
    @Override 
    public String apply(@NonNull String s, @NonNull String s2) throws Exception { 
     return s + " - " + s2; 
    } 
}).subscribe(new Consumer<String>() { 
    @Override 
    public void accept(String s) throws Exception { 
     System.out.println(s); 
    } 
}); 

這將打印:

Hello - Bye 
World - Friends 
+0

很好,在'Func2 <...>',這三個字符串是什麼?是's',''和's2'嗎? – chornge

+1

我剛剛更新了處理RxJava2方法的答案,BiFunction中的第一個String是第一個observable的類型,第二個String是第二個observable的類型,第三個String是它將生成的observable的類型! –

1

我一直在尋找如何使用郵編操作一個簡單的答案,並與觀測量做什麼我創建它來傳遞給它,我想知道是否應該爲每個observable調用subscribe(),而不是這些答案很容易找到,我必須通過我自己來找出它,所以這裏是一個簡單的例子在2個可觀察對象上使用Zip運算符:

@Test 
public void zipOperator() throws Exception { 

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4); 
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e"); 

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes); 

    Observable<String> lettersObservable = Observable.fromIterable(letters); 

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) 
      .subscribe(printMergedItems()); 
} 

@NonNull 
private BiFunction<Integer, String, String> mergeEmittedItems() { 
    return new BiFunction<Integer, String, String>() { 
     @Override 
     public String apply(Integer index, String letter) throws Exception { 
      return "[" + index + "] " + letter; 
     } 
    }; 
} 

@NonNull 
private Consumer<String> printMergedItems() { 
    return new Consumer<String>() { 
     @Override 
     public void accept(String s) throws Exception { 
      System.out.println(s); 
     } 
    }; 
} 

打印的結果是:

[0] a 
[1] b 
[2] c 
[3] d 
[4] e 

最終答案,但如我的頭爲如下

的觀傳遞給拉鍊(問題)方法只是需要只創建它們,它們不需要任何訂閱者,只創建它們就足夠了...如果你想讓任何observable在一個調度器上運行,你可以爲這個Observable指定這個...我也試過了zip()在Observables的操作員他們應該等待那裏的結果,並且zip()的消耗品是t只有當兩個結果準備就緒時(這是預期的行爲)

+0

我有兩個Observable列表(http請求)。第一個列表包含創建項目的請求,第二個列表包含更新項目的請求。當所有請求都結束時,我想執行「某事」。我正在考慮使用zip,但以您的示例爲例,我發現它是按組對請求進行分組的,並且我只想在最後得到通知(列表具有不同的大小)。你能給我一些想法嗎?謝謝。 – JCarlos

+1

看看這個: http://reactivex.io/documentation/operators/combinelatest.html –