對於RX我有一個小問題。我有一串符號從鍵盤進入,我需要將它們分成組。當一個';'應該開始一個新的組,符號來自流。簡單來說,我需要一個像Buffer一樣的運算符,但是當某個條件成立時會觸發,而不是經過一些時間延遲或事件計數。有沒有一種方法可以讓運營商已經出現在RX中,或者我應該自己註冊?帶反應式擴展的分區序列
回答
這是源代碼。
var source = new[] { 'a', 'b', ';', 'c', 'd', 'e', ';' }.ToObservable();
這裏是你要問什麼:
var groups = source
// Group the elements by some constant (0)
// and end the group when we see a semicolon
.GroupByUntil(x => 0, group => group.Where(x => x == ';'))
下面是使用它的方式:
groups
// Log that we're on the next group now.
.Do(x => Console.WriteLine("Group: "))
// Merge/Concat all the groups together
// {{a..b..;}..{c..d..e..;}} => {a..b..;..c..d..e..;}
.Merge()
// Ignore the semicolons? This is optional, I suppose.
.Where(x => x != ';')
// Log the characters!
.Do(x => Console.WriteLine(" {0}", x))
// Make it so, Number One!
.Subscribe();
輸出:
Group:
a
b
Group:
c
d
e
我們可以利用緩衝區覆蓋邊界可觀察到的地方邊界可觀察是我們的初始流只過濾分號入口。
//this is our incoming stream
IObservable<char> keyboardStream;
//if the observable is cold we need to do this
//in case of it being hot (as I would expect a keyboard stream to be) we do not need to do it
var persistedStream = keyboardStream.Publish().RefCount();
var bufferedBySemicolon = persistedStream.Buffer(persistedStream .Where(c=>c==';'));
這看起來簡單而有趣 –
我傾向於不贊同RefCount的大多數用法,除非沒有同步要求。涉及同步時,最好明確連接,特別是在處理熱點流時。 –
具體來說,如果您的'keyboardStream'在不同於'bufferedBySemicolon'訂閱的線程上觀察/生成,並且因此呈現爭用條件,則此處使用RefCount是非確定性的。 –
這是Nikolai的答案的非RefCount版本。這提供了訂閱和處置的更加明確的同步,並且應該消除當你的消息源在你的消費者訂閱的不同線程上觀察到的競爭狀態(當你處理UI時通常是這種情況)。
var groups = Observable.Create(o => {
var publishedSource = source.Publish();
return new CompositeDisposable(
publishedSource.Buffer(publishedSource.Where(c => c == ';')).Subscribe(o),
publishedSource.Connect()
);
});
- 1. f的反應式擴展#
- 2. 反應式擴展程序是否支持滾動緩衝區?
- 3. 有反應式擴展的書嗎?
- 4. 針對Java的反應式擴展
- 5. 反應式擴展/並行擴展中的最大線程數
- 6. 反序列化帶時區的JSON DateTime
- 7. 來自緩衝區的反應性擴展緩衝區
- 8. 反應擴展:從的IObservable
- 9. 協同/反應式擴展 - 寫入行
- 10. 使用Observable.Publish與反應式擴展
- 11. 反應式擴展Sample或Throttle?
- 12. 應該'顯示錶擴展'列出分區下的文件?
- 13. Jackson的序列化和java.lang.Number擴展的反序列化
- 14. 不能擴展分區
- 15. 如何做反應擴展
- 16. 反應性擴展文檔
- 17. 反應性擴展超時不停止序列?
- 18. 反應式Mongo擴展:如何在反應式mongo擴展中使用查詢DSL使用$ push和$ each
- 19. 反應式擴展序列化任務每個都可以取消
- 20. 反序列化一個帶有不同擴展名的xml文件
- 21. GSON不對序列化/反序列化擴展類
- 22. 反類擴展
- 23. 帶負載平衡擴展的OSGi擴展模式
- 24. 反應的擴展/ RxJS Implementatation到Node.js的
- 25. 反應性擴展...在CRUD應用程序中的示例
- 26. 擴展bootstrap創建反應的應用程序
- 27. 帶擴展卡的Chrome擴展通知
- 28. 帶多列列表的ajax自動完成擴展程序
- 29. 不區分大小寫反序列化
- 30. 擴展AWS EC2上的根分區
不會GroupByUntill創建一個新的序列,將在應用程序的整個生命週期中保持活躍狀態,還是我錯過了某些東西? –
@L.E.O,編號GroupBy將在外部訂閱的生命週期中保持對每個新的GroupedObservable的引用。 GroupByUntil將爲每個對象執行相同的操作,或者直到該GroupedObservable被「until」選擇器終止。對於這個實現,只會保留一個GroupedObservable,因爲我們正在按常量進行分組。 –
實際上,這與尼古拉的答案完全相同,除了它只使用一個操作員和一個訂閱。 –