1

我正在構建一個Angular2應用程序,所以我習慣了Observables和Reactive Extensions作爲一個整體。我正在使用TypeScript和rxjs。將多個Observable與不同的操作/操作結合

現在我已經有了一個可觀察的,或者如果你願意的話,一些對象的數組。讓我們說個人對象。現在,我已經有了人,畫線的其他兩個流,並希望這些,所以我得到一個流這始終是最新的組合:

var people$ = getPeople();     // Observable<Person[]> 
var personAdded$ = eventHub.personAdded; // Observable<Person>; 
var personRemoved$ = eventHub.personRemoved // Observable<Person>; 

var allwaysUpToDatePeople$ = people$.doSomeMagic(personAdded$, personRemoved$, ...); 

如果人民流發出的數組,讓我們說,5人,然後personAdded-stream發射一個人,allPeople-stream會發出一個6的數組。 如果personRemoved-stream發射一個人,那麼allPeople-stream應該發射一個Person對象的數組,而不僅僅是一個人由人移動流發射。

是否有一種方法內置到rxjs中以獲得此行爲?

回答

0

我的建議是,你將action的想法包裝成一個流,然後可以合併並直接應用到Array

的第一步是定義一些功能描述自己的行爲:

function add(people, person) { 
    return people.concat([people]); 
} 

function remove(people, person) { 
    const index = people.indexOf(person); 
    return index < 0 ? people : people.splice(index, 1); 
} 

注:我們避免突變代替陣列,因爲它可以有不可預見的副作用。純度要求我們創建一個數組副本。

現在我們可以使用這些功能,並將其提升到了流來創建一個Observable發射功能:

const added$ = eventHub.personAdded.map(person => people => add(people, person)); 
const removed$ = eventHub.personAdded.map(person => people => remove(people, person)); 

現在我們得到的形式事件:people => people了輸入和輸出將是一個數組的人(在這個例子中簡化爲一個字符串數組)。

現在我們將如何接線?嗯,我們真正關心的是後添加或刪除這些事件我們有一個數組將它們應用到:根據您的需要(即避免了功能鑽營,或使用

const currentPeople = 

    // Resets this stream if a new set of people comes in 
    people$.switchMap(peopleArray => 

    // Merge the actions together 
    Rx.Observable.merge(added$, removed$) 

     // Pass in the starting Array and apply each action as it comes in 
     .scan((current, op) => op(current), peopleArray) 

     // Always emit the starting array first 
     .startWith(people) 
) 
    // This just makes sure that every new subscription doesn't restart the stream 
    // and every subscriber always gets the latest value 
    .shareReplay(1); 

有此技術的一些優化一個二進制搜索),但我發現以上相對優雅的通用情況。

+0

很好的解釋,我要試一試,但我明白你的意思了!我會認爲這是人們想要經常做的事,而不是? – YentheO

1

你想要合併所有的流(Ghostbusters風格),然後使用掃描運算符來找出狀態。掃描運算符就像Javascript的縮減一樣。

這裏有一個演示...

const initialPeople = ['Person 1', 'Person 2', 'Person 3', 'Person 4']; 

const initialPeople$ = Rx.Observable.from(initialPeople); 

const addPeople = ['Person 5', 'Person 6', 'Person 7']; 

const addPeople$ = Rx.Observable.from(addPeople) 
      .concatMap(x => Rx.Observable.of(x).delay(1000)); // this just makes it async 

const removePeople = ['Person 2x', 'Person 4x']; 

const removePeople$ = Rx.Observable.from(removePeople) 
               .delay(5000) 
               .concatMap(x => Rx.Observable.of(x).delay(1000)); 

const mergedStream$ = Rx.Observable.merge(initialPeople$, addPeople$, removePeople$) 

mergedStream$ 
    .scan((acc, stream) => { 
     if (stream.includes('x') && acc.length > 0) { 
      const index = acc.findIndex(person => person === stream.replace('x', '')) 
      acc.splice(index, 1); 
     } else { 
      acc.push(stream); 
     } 
     return acc; 
    }, []) 
    .subscribe(x => console.log(x)) 

// In the end, ["Person 1", "Person 3", "Person 5", "Person 6", "Person 7"] 

http://jsbin.com/rozetoy/edit?js,console

你沒有提到你的數據的結構。我使用「x」作爲標誌有點(很多)笨重和有問題。但我想你會看到如何修改掃描操作符以適合您的數據。