2017-08-09 34 views
3

我是新來的反應編程,並有困難使用「一切都可以是流」曼陀羅。我正在考慮以下情形 - 我有definied這樣的WebSocket事件流:如何取消反應流中的事件?

Rx.Observable.create((observer) => { 

    io.on('connect', function(socket){ 

     socket.on("enroll", function(player) { 
     observer.next({ 
      event: 'enroll', 
      player, 
      socket 
     }); 
     }); 

     socket.on('resign', function(player){ 
     observer.next({ 
      event: 'resign', 
      player, 
      socket 
     }); 
     }); 

    }); 

    return { 
     dispose: io.close 
    }; 
    }); 

然後,我可以做這樣的事情

enrollmentStream = events$ 
     .filter(find({ event: "enroll" })) 
     .map(pick('player')); 

而且同樣

resignationStream = events$ 
     .filter(find({ event: "resign" })) 
     .map(pick('player')); 

我希望將註冊的玩家聚集在一個可以批量添加4的遊戲中,但顯然這隻適用於註冊用戶流,但不是在辭職StreamStream或至少最後一個事件是註冊。我該怎麼做呢?

這裏是大理石圖。

Marble diagram

有5名球員報名參加其中。當有4名玩家參加時,遊戲開始。請注意,第二個玩家(紫羅蘭色)註冊但是辭職,所以遊戲不是以藍色大理石開始,而是以下一個黃色 - 之後纔會有真正的4名玩家準備好。

也許應該有一些流操作,如「不」 ...有嗎?

+2

我不確定我是否遵循了您的要求,您是否可以添加需要處理的一系列動作的大理石或時序圖? – paulpdaniels

+0

請看看最新的問題。 – kboom

回答

2

我想在這種情況下,你可以使用combineLatest()scan()運營商,然後作出的unresigned自己的球員名單:

const bufferedEnrollment = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 
const bufferedResignation = enrollmentStream.scan((acc, val) => { acc.push(val); return acc; }, []); 

Observable.combineLatest(bufferedEnrollment, bufferedResignation) 
    .map(values => { 
    const enrolled = values[0]; 
    const resigned = values[1]; 

    // remove resigned players from `enrolled` array 
    return enrolled; 
    }) 
    .filter(players => players.length === 4) 
    .subscribe(...) 

scan()運營商僅用於收集玩家到一個數組。例如,如果你想要重置數組,你可以將它與另一個Observable合併。

enrollmentStream 
    .merge(resetStream) 
    .scan((acc, val) => { 
    if (!val) { 
     return []; 
    } 
    acc.push(val); 
    return acc; 
    }, []); 

(出於顯而易見的原因,我沒有測試此代碼)。