0
A
回答
1
我剛剛實施它很快。不知道是否它的無憂無慮,但值得一試:) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 歡迎評論下的要點!
正如我們在gitter頻道上談到的,它不能通過階段構建來實現,但是您可以使用自定義階段編寫功能。你需要兩個輸入和一個輸出(可以擴展到N輸入),所以它是一個風扇。
我將傳入的元素保存到選項,並且每當輸入就緒(即發送元素)時,我將給定元素保存到該選項。每當輸出需要一個元素(並且我們已經有兩個輸入中的一個元素)時,我將它作爲選項的值從選項中給出。這是背壓感知方法。你需要處理等待「其他」輸出元素,然後是最後一個輸入元素,並且需要處理輸入拉。我認爲我的實現仍然沒有處理速度過慢的消費者案例(我們可能會錯過一個元素,可以處理髮射),並且如果兩個輸入多次產生相同的元素(也許發射也可以處理這個)會死鎖。
如果你想延長我的代碼功能,或者想要寫的其他自定義階段閱讀:http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html
+0
用設計思想,可能的錯誤,提及的文檔和'before'上下文進行編輯。 (我不認爲複製整個代碼是個好主意......) – tg44
相關問題
- 1. 阿卡流 - Source.fromPublisher
- 2. Rx斯卡拉結合最新的多個流
- 3. 阿卡流+阿卡-HTTP生命週期
- 4. 阿卡流OnNext不允許
- 5. 如何關閉阿卡流?
- 6. 測試阿卡反應流
- 7. 阿帕奇卡夫卡和Spark流
- 8. 阿卡輸入流處理
- 9. 阿卡卡夫卡流監理策略不工作
- 10. 斯卡拉阿卡流合併過濾器和地圖
- 11. 阿卡流+阿卡的Http - 獲取上的錯誤
- 12. 閱讀使用阿卡流
- 13. 阿卡流+阿卡的Http傳遞參數
- 14. 阿克卡流節流閥是如何工作的?
- 15. 相結合的「IN」操作
- 16. 阿卡流TCP +阿卡流卡夫卡生產者未停止不發佈消息,而不是錯誤-ING出
- 17. 操縱序列元素在阿卡流動
- 18. 斯卡拉解析左結合的標操作
- 19. 阿卡模擬
- 20. 將多個Observable與不同的操作/操作結合
- 21. 如何結束無限的阿克卡流
- 22. 瞭解阿卡流中的背壓Source.queue
- 23. PHP中的最新操作?
- 24. 如何批量使用阿卡流Flow.batch
- 25. 與阿卡流Streaming巨大的Json
- 26. 阿卡流 - 扔掉的消息
- 27. 結合2個LINQ操作
- 28. 卡夫卡聚合流數據流
- 29. 最新操作系統/最新SDK
- 30. 調用在阿卡
你能否提供一些更多的細節?什麼是投入,什麼是消費者。在這裏我有一個互操作性基準測試,顯示了RxJava 2如何與Akka-Stream一起工作:https://github.com/akarnokd/akarnokd-misc/blob/master/src/jmh/java/hu/akarnokd/comparison/AkkaStreamsCrossMapPerf。 java#L54 – akarnokd
什麼樣的細節?我希望能夠像'zip'函數那樣完成兩個流,但是我不是使用zip語義,而是使用了鏈接的最新語義。 檢查了你的鏈接,我不確定它對我有何幫助?也許我們正在交叉目的? –
我以爲你想結合兩個阿卡流,但它缺乏運營商,所以你想重用RxJava的combineLatest運營商。 – akarnokd