2016-10-19 95 views
0

我想結合最新的阿卡流,如here所述。阿卡流 - 結合最新的操作

我不知道該怎麼做 - 請幫助!

謝謝, 瑞恩。

+0

你能否提供一些更多的細節?什麼是投入,什麼是消費者。在這裏我有一個互操作性基準測試,顯示了RxJava 2如何與Akka-Stream一起工作:https://github.com/akarnokd/akarnokd-misc/blob/master/src/jmh/java/hu/akarnokd/comparison/AkkaStreamsCrossMapPerf。 java#L54 – akarnokd

+0

什麼樣的細節?我希望能夠像'zip'函數那樣完成兩個流,但是我不是使用zip語義,而是使用了鏈接的最新語義。 檢查了你的鏈接,我不確定它對我有何幫助?也許我們正在交叉目的? –

+0

我以爲你想結合兩個阿卡流,但它缺乏運營商,所以你想重用RxJava的combineLatest運營商。 – akarnokd

回答

1

我剛剛實施它很快。不知道是否它的無憂無慮,但值得一試:) https://gist.github.com/tg44/2e75d45c234ca02d91cfdac35f41a5a2 歡迎評論下的要點!

正如我們在gitter頻道上談到的,它不能通過階段構建來實現,但是您可以使用自定義階段編寫功能。你需要兩個輸入和一個輸出(可以擴展到N輸入),所以它是一個風扇。

我將傳入的元素保存到選項,並且每當輸入就緒(即發送元素)時,我將給定元素保存到該選項。每當輸出需要一個元素(並且我們已經有兩個輸入中的一個元素)時,我將它作爲選項的值從選項中給出。這是背壓感知方法。你需要處理等待「其他」輸出元素,然後是最後一個輸入元素,並且需要處理輸入拉。我認爲我的實現仍然沒有處理速度過慢的消費者案例(我們可能會錯過一個元素,可以處理髮射),並且如果兩個輸入多次產生相同的元素(也許發射也可以處理這個)會死鎖。

如果你想延長我的代碼功能,或者想要寫的其他自定義階段閱讀:http://doc.akka.io/docs/akka/2.5/scala/stream/stream-customize.html

+0

用設計思想,可能的錯誤,提及的文檔和'before'上下文進行編輯。 (我不認爲複製整個代碼是個好主意......) – tg44