2016-11-23 57 views
6

說,我有兩個來源:一個人如何控制的阿卡流的基礎上另一個流流

val ticks = Source(1 to 10) 
val values = Source[Int](Seq(3,4,4,7,8,8,8,8,9).to[collection.immutable.Iterable]) 

我想創建一個基於當前值在阿卡流的Graph[...]處理步驟ticks數據流儘可能在數據流中消耗。因此,例如,當值匹配我想返回所有匹配的第二源的元素,否則保持滴答導致類似的輸出:

(1, None) 
(2, None) 
(3, Some(Seq(3))) 
(4, Some(Seq(4, 4))) 
(5, None) 
(6, None) 
(7, Some(Seq(7))) 
(8, Some(Seq(8,8,8,8))) 
(9, Some(Seq(9))) 
(10, None) 

你將如何實現這一行爲?

回答

1

我建議你看看關於這個問題的阿卡流文檔:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M2/scala/stream-graphs.html

根據該網站,就可以實現這樣的GraphStage:

final class AccumulateWhileUnchanged[E] extends GraphStage[FlowShape[E, immutable.Seq[E]]] { 

val in = Inlet[E]("AccumulateWhileUnchanged.in") 
val out = Outlet[immutable.Seq[E]]("AccumulateWhileUnchanged.out") 

override def shape = FlowShape(in, out) 
} 

還有一個博客文章在這個問題上:http://blog.kunicki.org/blog/2016/07/20/implementing-a-custom-akka-streams-graph-stage/

希望這會有所幫助:)

+0

您能否給出具體細節?你的答案只是表明可以寫一個自定義的階段,它不提供解決給定問題的階段...... –

相關問題