0
我試圖實現某種流式解析器。假設我有整數流,並將它們合併以創建新的對象,該對象聚集了流的一部分。使用RxJava將多個項目從observable組合到新對象
例如,當整數爲負時,對象爲「完成」。爲了簡單起見,生產的項目將是數字串。
下面是簡單的例子:
Source: 1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11
Output: "1-2", "343-5", "6-7", "8910-11"
我無法找到現有運營商的適當組合。我的解決辦法是提供運營商定製,它彙集數據,直到「塊」是有效的,併發出新的對象:
rx.Observable<Integer> src = rx.Observable
.from(Arrays.asList(1, -2, 3, 4, -5, 6, -7, 8, 9, 10, -11));
rx.Observable<String> res = src
.lift(new rx.Observable.Operator<String, Integer>() {
@Override
public rx.Subscriber<? super Integer> call(
final rx.Subscriber<? super String> subscriber) {
return new rx.Subscriber<Integer>(subscriber) {
private final StringBuilder cur = new StringBuilder();
@Override
public void onCompleted() {
if (!subscriber.isUnsubscribed()) {
subscriber.onCompleted();
}
}
@Override
public void onError(Throwable e) {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(e);
}
}
@Override
public void onNext(Integer i) {
if (subscriber.isUnsubscribed())
return;
cur.append(i).append('');
if (i < 0) {
subscriber.onNext(cur.toString());
cur.delete(0, cur.length());
}
}
};
}
});
rx.observers.TestSubscriber<String> sub =
new rx.observers.TestSubscriber<>();
res.subscribe(sub);
List<String> l1 = sub.getOnNextEvents();
它工作得很好,直到我用一些位置運營商在他們身上,像取值(N)或combineLatest。
rx.Observable<String> res2 = res.take(2);
在這種情況下,我沒有得到onCompleted或的onError和項目的少量發射。
它看起來像每個輸入onNext應該有輸出onNext下一個調用。
任何想法?