好吧,所以我找到了一個體面的解決方案。它已經坐在我的鼻子下面,我只是沒有看到它。 Source.lazyEmpty
具體化爲承諾,當完成時將終止來源及其背後的流。
剩下的問題是,如何將它包含到無限數量的隨機數字中。我試過Zip
。結果是沒有隨機數字通過該流,因爲lazyEmpty
從未發出值(doh)。我試過Merge
,但該流永遠不會終止,因爲Merge
會一直持續到全部源已完成。
所以我寫了自己的合併。它轉發來自其中一個輸入端口的所有值,並在任意源完成時終止。
object StopperFlow {
private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) {
val in = newInlet[A]("in")
val stopper = newInlet[Unit]("stopper")
override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init)
}
private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
new StopperMergeShape(), Attributes.name("StopperMerge")) {
import FlexiMerge._
override def createMergeLogic(p: PortT) = new MergeLogic[In] {
override def initialState =
State[In](Read(p.in)) { (ctx, input, element) =>
ctx.emit(element)
SameState
}
override def initialCompletionHandling = eagerClose
}
}
def apply[In](): Flow[In, In, Promise[Unit]] = {
val stopperSource = Source.lazyEmpty[Unit]
Flow(stopperSource) { implicit builder =>
stopper =>
val stopperMerge = builder.add(new StopperMerge[In]())
stopper ~> stopperMerge.stopper
(stopperMerge.in, stopperMerge.out)
}
}
}
該流可以插入任何流中。實現時,它將返回一個Promise
,完成後終止流。這是我的測試。
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val startTime = System.currentTimeMillis()
def dumpToConsole(f: Float) = {
val timeSinceStart = System.currentTimeMillis() - startTime
System.out.println(s"[$timeSinceStart] - Random number: $f")
}
val randomSource = Source(() => Iterator.continually(Random.nextFloat()))
val consoleSink = Sink.foreach(dumpToConsole)
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink)
val (_, promise) = flow.run()
Thread.sleep(1000)
val _ = promise.success(())
Thread.sleep(1000)
我希望這對其他人也有用。仍然讓我感到困惑的是,爲什麼沒有內建的流式終端流。
我認爲你可以使用'Zip'階段來實現這一點:壓縮你的源與總是產生一個值,直到一定的條件得到滿足,並使用您的REST其他來源API來引導第二個來源。我不確定這是否是最好的方法,所以我寧願不回答這個問題;我從中獲得靈感:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-cookbook.html#Triggering_the_flow_of_elements_programmatically –
Akka似乎遵循類似的方法他們的Tcp服務器實現。打開套接字時,源代碼返回傳入連接並將其物化成可用於關閉套接字的ServerBinding對象。由於這一點以及缺乏更好的想法,我會接受你的答覆作爲正確答案。 – ErosC
http://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html –