2017-07-20 31 views
2

假設我們有兩個可觀測物Observable<Integer> o1Observable<Integer> o2,並且每個可觀測物正在產生嚴格遞增的序列。兩個有序可觀測物的全外連接

任務是對這兩個觀察對象執行完全外連接的等效操作。例如加入的

Observable.just(0, 2, 3, 6) 
    Observable.just(1, 2, 3, 4, 5, 6) 

應該產生

[ [0, _], [_, 1], [2, 2], [3, 3], [_, 4], [_, 5], [6, 6] ] 

的加入應該是高效的和非常大的或無限流工作。

該解決方案在拉方案中很容易。有沒有一種慣用的方法來實現這一點?

回答

1

有這個沒有單一的運營商,但它可以從標準和擴展運營商組成的行爲:

static abstract class Pair implements Comparable<Pair> { 
    int value; 

    @Override 
    public int compareTo(Pair o) { 
     return Integer.compare(value, o.value); 
    } 
} 

static final class Left extends Pair { 
    Left(int value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "[" + value + ", _]"; 
    } 
} 

static final class Right extends Pair { 
    Right(int value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "[_, " + value + "]"; 
    } 
} 

static final class Both extends Pair { 
    Both(int value) { 
     this.value = value; 
    } 

    @Override 
    public int hashCode() { 
     return value; 
    } 

    @Override 
    public boolean equals(Object obj) { 
     if (obj instanceof Both) { 
      return ((Both)obj).value == value; 
     } 
     return false; 
    } 

    @Override 
    public String toString() { 
     return "[" + value + ", " + value + "]"; 
    } 
} 

@SuppressWarnings("unchecked") 
@Test 
public void test() { 
    Flowable<Integer> a = Flowable.just(0, 2, 3, 6); 
    Flowable<Integer> b = Flowable.just(1, 2, 3, 4, 5, 6); 

    Flowable.defer(() -> { 
     boolean[] skip = { false }; 
     return Flowables.<Pair>orderedMerge(
       a.<Pair>map(Left::new), b.<Pair>map(Right::new) 
      ) 
      .distinctUntilChanged() 
      .buffer(2, 1) 
      .flatMapIterable(buf -> { 
       if (skip[0]) { 
        skip[0] = false; 
        return Collections.emptyList(); 
       } 
       if (buf.size() == 2) { 
        if (buf.get(0).value == buf.get(1).value) { 
         skip[0] = true; 
         return Collections.singletonList(new Both(buf.get(0).value)); 
        } 
        return buf.subList(0, 1); 
       } 
       return buf; 
      }); 
    }) 
    .subscribe(System.out::println); 
} 

其中Flowables.orderedMergeRxJava 2 Extensions library

+0

謝謝,很好的答案,它的工作原理。 幾個問題/注意到如果有重複的源可觀察時,才需要 1)distinctUntilChanged,但如果整數嚴格遞增該操作是多餘的 2)'flatMapIterable'應該是不尊順序,並且可以執行同時,不是嗎?所以,由於使用skip數組,似乎有一場比賽。它不應該被'concatMapIterable'取代嗎? 再次感謝。 – hgrey

+1

1)是的。我留在那裏以防萬一。 2)否,消費Iterable是同步的,不允許併發合併,並且等價於'concatMapIterable'。事實上,它們是同一個底層運營商的別名。此外,該功能保證順序執行,因此前一次運行的跳過狀態將永遠不會同時修改。 – akarnokd