我需要創建Observable,它將收集來自不同來源(其他Observables)的信息,並且每個來源對事件值都有影響,但仍然基於前一個價值(種類的狀態機)。將Observable與反饋合併,將Observable自身合併
我們有一個int值和操作碼消息:
class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
}
而且這種價值的一些來源,初值:
Observable<Message> dynamicMessage = Observable.just(new Message(1, "+"));
現在有事件的來源。這些數據源將發佈基於此值的值,並基於dynamicMessage的previos值顯示新的dynamicMessage值。其實它的事件類型是:
class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
}
class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
}
Changer1對改變操作有反應。 Changer2對變化的價值做出迴應。
和來源這些值:
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
現在我需要改變dynamicMessage可觀察到,並採取有關的消息從兌換。這裏是一個圖:
另外我嘗試寫了一個程序,如何看到該溶液中,但它與OutOfMemoryException異常掛起,後分辯第一值:
消息{值= 1,操作=「+」}
上市:
import rx.Observable;
import rx.functions.Action1;
import rx.functions.Func1;
public class RxDilemma {
static class Message{
Integer value;
String operation;
public Message(Integer value, String operation) {
this.value = value;
this.operation = operation;
}
@Override
public String toString() {
return "Message{" +
"value=" + value +
", operation='" + operation + '\'' +
'}';
}
}
static class Changer1 {
String operation;
public Changer1(String operation) {
this.operation = operation;
}
@Override
public String toString() {
return "Changer1{" +
"operation='" + operation + '\'' +
'}';
}
}
static class Changer2 {
Integer value;
public Changer2(Integer value) {
this.value = value;
}
@Override
public String toString() {
return "Changer2{" +
"value=" + value +
'}';
}
}
static Observable<Changer1> changers1 = Observable.just(new Changer1("-"))
.delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+"))
.delay(2, TimeUnit.SECONDS));
static Observable<Changer2> changers2 = Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2))
.delay(2, TimeUnit.SECONDS));
static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer1 changer) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
message.operation = changer.operation;
return message;
}
});
}
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() {
@Override
public Observable<Message> call(final Changer2 changer2) {
return dynamicMessage.last().map(new Func1<Message, Message>() {
@Override
public Message call(Message message) {
if("+".equalsIgnoreCase(message.operation)){
message.value = message.value+changer2.value;
} else if("-".equalsIgnoreCase(message.operation)){
message.value = message.value-changer2.value;
}
return message;
}
});
}
}));
public static void main(String[] args) throws InterruptedException {
dynamicMessage.subscribe(new Action1<Message>() {
@Override
public void call(Message message) {
System.out.println(message);
}
});
Thread.sleep(5000000);
}
}
那麼,什麼我的預期outpur是:
消息{值= 1,操作= '+'}
消息{值= 1,操作= ' - '}
消息{值= -1,操作= ' - '}
消息{值= 1,操作= '+'}
消息{值= 2,操作= '+'}
此外,我關於將所有與CombineLatest合併,但後來我發現CombineLatest沒有提供哪一個元素被改變,沒有這些信息我不知道消息中的哪些改變需要完成。 有什麼幫助嗎?
此功能提供了正確的最終價值,但中間值丟失。只有當所有兌換人員完成工作後,纔可以訂閱。 – msangel
'valueChangers = Observable.interval(1,TimeUnit.SECONDS).map(aLong - > new ValueChanger(1))'永遠導致這個掛起。 – msangel
對不起,我一段時間都沒用過減少。但是,您可以用'相同的參數交換'scan'的'reduce',它將按預期工作。我將修改我的答案馬上 – koperko