2017-02-04 78 views
2

我需要創建Observable,它將收集來自不同來源(其他Observables)的信息,並且每個來源對事件值都有影響,但仍然基於前一個價值(種類的狀態機)。將Observable與反饋合併,將Observable自身合併

我們有一個int值和操作碼消息:

class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 
} 

而且這種價值的一些來源,初值:

Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")); 

現在有事件的來源。這些數據源將發佈基於此值的值,並基於dynamicMessage的previos值顯示新的dynamicMessage值。其實它的事件類型是:

class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 
} 


class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 
} 

Changer1對改變操作有反應。 Changer2對變化的價值做出迴應。

和來源這些值:

static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 

現在我需要改變dynamicMessage可觀察到,並採取有關的消息從兌換。這裏是一個圖: state machine

另外我嘗試寫了一個程序,如何看到該溶液中,但它與OutOfMemoryException異常掛起,後分辯第一值:
消息{值= 1,操作=「+」}
上市:

import rx.Observable; 
import rx.functions.Action1; 
import rx.functions.Func1; 

public class RxDilemma { 


static class Message{ 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + 
       "value=" + value + 
       ", operation='" + operation + '\'' + 
       '}'; 
    } 
} 

static class Changer1 { 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + 
       "operation='" + operation + '\'' + 
       '}'; 
    } 
} 


static class Changer2 { 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + 
       "value=" + value + 
       '}'; 
    } 
} 





static Observable<Changer1> changers1 = Observable.just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS).concatWith(Observable.just(new Changer1("+")) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Changer2> changers2 = Observable.just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS).concatWith(Observable.just(new Changer2(2)) 
       .delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable.just(new Message(1, "+")).mergeWith(changers1.flatMap(new Func1<Changer1, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer1 changer) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       message.operation = changer.operation; 
       return message; 
      } 
     }); 
    } 
})).mergeWith(changers2.flatMap(new Func1<Changer2, Observable<Message>>() { 
    @Override 
    public Observable<Message> call(final Changer2 changer2) { 
     return dynamicMessage.last().map(new Func1<Message, Message>() { 
      @Override 
      public Message call(Message message) { 
       if("+".equalsIgnoreCase(message.operation)){ 
        message.value = message.value+changer2.value; 
       } else if("-".equalsIgnoreCase(message.operation)){ 
        message.value = message.value-changer2.value; 
       } 
       return message; 
      } 
     }); 
    } 
})); 

public static void main(String[] args) throws InterruptedException { 

    dynamicMessage.subscribe(new Action1<Message>() { 
     @Override 
     public void call(Message message) { 
      System.out.println(message); 
     } 
    }); 


    Thread.sleep(5000000); 
} 
} 

那麼,什麼我的預期outpur是:

消息{值= 1,操作= '+'}
消息{值= 1,操作= ' - '}
消息{值= -1,操作= ' - '}
消息{值= 1,操作= '+'}
消息{值= 2,操作= '+'}

此外,我關於將所有與CombineLatest合併,但後來我發現CombineLatest沒有提供哪一個元素被改變,沒有這些信息我不知道消息中的哪些改變需要完成。 有什麼幫助嗎?

回答

1

換句話說,你要減少掃描你的行動,以反映狀態的變化。有一個運營商,正確地命名爲reducescan,這正是你所要求的。它需要每一個事件來,並讓你轉換它與該轉換的以前的結果,這將是你的狀態。

interface Changer { 
    State apply(State state); 
} 

class ValueChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 


class OperationChanger implements Changer { 
    ... 
    State apply(State state){ 
     // implement your logic of changing state by this action and return a new one 
    } 
    ... 
} 

Observable<ValueChanger> valueChangers 
Observable<OperationChanger> operationChangers 

Observable.merge(valueChangers, operationChangers) 
    .scan(initialState, (state, changer) -> changer.apply(state)) 
    .subscribe(state -> { 
     // called every time a change happens 
    }); 

請注意,我改變了你的命名,使其更有意義。另外,我在這裏使用Java8 lambda表達式。如果您不能在您的項目中使用它們,請將它們交換爲匿名類。

編輯:使用scan操盤reduce,因爲它提供每個結果一路走來,而不是發光只是最後的結果

+0

此功能提供了正確的最終價值,但中間值丟失。只有當所有兌換人員完成工作後,纔可以訂閱。 – msangel

+0

'valueChangers = Observable.interval(1,TimeUnit.SECONDS).map(aLong - > new ValueChanger(1))'永遠導致這個掛起。 – msangel

+0

對不起,我一段時間都沒用過減少。但是,您可以用'相同的參數交換'scan'的'reduce',它將按預期工作。我將修改我的答案馬上 – koperko

1

已經取得了一些變化,希望能夠足夠了您的需求。請看看結果。

static class Message { 
    Integer value; 
    String operation; 

    public Message(Integer value, String operation) { 
     this.value = value; 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Message{" + "value=" + value + ", operation='" + operation 
       + '\'' + '}'; 
    } 

    public Message applyChange(Object changer){ 
     if(changer instanceof Changer1){ 
      this.operation = ((Changer1)changer).operation; 
     }else if(changer instanceof Changer2){ 
      int v2 = ((Changer2)changer).value; 
      if("+".equals(operation)){ 
       value += v2; 
      }else if("-".equals(operation)){ 
       value -= v2; 
      } 
     } 
     return this; 
    } 
} 

static class Changer1{ 
    String operation; 

    public Changer1(String operation) { 
     this.operation = operation; 
    } 

    @Override 
    public String toString() { 
     return "Changer1{" + "operation='" + operation + '\'' + '}'; 
    } 
} 

static class Changer2{ 
    Integer value; 

    public Changer2(Integer value) { 
     this.value = value; 
    } 

    @Override 
    public String toString() { 
     return "Changer2{" + "value=" + value + '}'; 
    } 
} 

static Observable<? extends Object> changers1 = Observable 
     .just(new Changer1("-")) 
     .delay(1, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer1("+")).delay(2, 
         TimeUnit.SECONDS)); 

static Observable<? extends Object> changers2 = Observable 
     .just(new Changer2(2)) 
     .delay(2, TimeUnit.SECONDS) 
     .concatWith(
       Observable.just(new Changer2(2)).delay(2, TimeUnit.SECONDS)); 


static Observable<Message> dynamicMessage = Observable 
     .just(new Message(1, "+")) 
     .flatMap(m -> ((Observable<Object>)changers1).mergeWith((Observable<Object>)changers2).map(c -> m.applyChange(c))); 

public static void main(String[] args) throws InterruptedException { 
    dynamicMessage.toBlocking().subscribe(v -> System.out.println(v)); 
} 

的變化:

Message具有其被稱爲Changer發射每次新方法,它改變了Message對象的值。這是因爲你只有一個Message對象,它實際上是init的值。

public Message applyChange(Object changer){ 
    if(changer instanceof Changer1){ 
     this.operation = ((Changer1)changer).operation; 
    }else if(changer instanceof Changer2){ 
     int v2 = ((Changer2)changer).value; 
     if("+".equals(operation)){ 
      value += v2; 
     }else if("-".equals(operation)){ 
      value -= v2; 
     } 
    } 
    return this; 
} 

dynamicMessage根據需要進行修改 -

static Observable<Message> dynamicMessage = Observable 
    .just(new Message(1, "+")) 
    .flatMap(m -> ((Observable<Object>)changers1) 
     .mergeWith((Observable<Object>)changers2) 
     .map(c -> m.applyChange(c))); 
+0

好的,我試過了。有用。我不得不再花5分鐘思考如何以及爲什麼?***。簡短的結論:它可以工作,但是第一個flatMap函數只接收一個值,所以嵌套的observables中的所有操作都是在同一個對象上物理地執行的。使'applyChange'功能同步將防止我的狀態不良。第二個答案似乎正在努力,所以我將標記爲連接沒有。 (但是,這個讀者會再次投票接受他們的答案)。謝謝。 – msangel

0

好,我這裏有兩個答案。其中之一正在工作,但它有設計問題,即強制手動控制併發執行。另外一個沒有被設計工作(也許有些操作與製造reduce操作員的工作量不爲所有,但每對數據的,將工作之後)。但我在這裏發佈自己的答案,因爲我發現更優雅,更正確的解決方案。

但答案之前,先來看看這個問題。它爲什麼掛起?答案是:因爲運營商last()正在阻止。所以我需要強制它是非阻塞的。基於另一個答案在這裏:https://stackoverflow.com/a/37015509/449553,我可以使用BehaviorSubject類爲。

這是我的工作代碼:

final BehaviorSubject<Message> subject = BehaviorSubject.create(new Message(1, "+")); 
    Observable<Message> observable = Observable.<Changer>merge(changers1, changers2).map(new Func1<Changer, Message>() { 
     @Override 
     public Message call(Changer changer) { 
      Message value = subject.getValue(); 
      Message v = value.applyChange(changer); 
      subject.onNext(v); 
      return v; 
     } 
    }); 

(缺少的部分代碼,你會發現在其他的答案herehere) 不過,我不會慶祝的答案是正確的,我相信,有人可能會想以更合適的方式自己解決這個難題。

編輯:正確的方式發現,看到標有答案。

相關問題