2015-07-13 27 views
7

我正在設計一個小工具來生成CSV測試數據。我想使用Akka Streams(1.0-RC4)來實現數據流。將會有一個Source產生隨機數字,轉換成CSV字符串,一些速率限制器和一個Sink寫入文件。如何從外部正確地停止Akka流

也應該有一個乾淨的方式來停止使用小型REST接口的工具。

這是我掙扎的地方。流開始後(RunnableFlow.run())似乎沒有辦法阻止它。源和接收器是無限的(至少在磁盤運行完之後:)),所以它們不會停止流。

將控制邏輯添加到Source或Sink感覺不對。也使用ActorSystem.shutdown()。什麼是停止流的方式?

+0

我認爲你可以使用'Zip'階段來實現這一點:壓縮你的源與總是產生一個值,直到一定的條件得到滿足,並使用您的REST其他來源API來引導第二個來源。我不確定這是否是最好的方法,所以我寧願不回答這個問題;我從中獲得靈感:http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC4/scala/stream-cookbook.html#Triggering_the_flow_of_elements_programmatically –

+0

Akka似乎遵循類似的方法他們的Tcp服務器實現。打開套接字時,源代碼返回傳入連接並將其物化成可用於關閉套接字的ServerBinding對象。由於這一點以及缺乏更好的想法,我會接受你的答覆作爲正確答案。 – ErosC

+0

http://doc.akka.io/docs/akka/current/scala/stream/stream-dynamic.html –

回答

10

好吧,所以我找到了一個體面的解決方案。它已經坐在我的鼻子下面,我只是沒有看到它。 Source.lazyEmpty具體化爲承諾,當完成時將終止來源及其背後的流。

剩下的問題是,如何將它包含到無限數量的隨機數字中。我試過Zip。結果是沒有隨機數字通過該流,因爲lazyEmpty從未發出值(doh)。我試過Merge,但該流永遠不會終止,因爲Merge會一直持續到全部源已完成。

所以我寫了自己的合併。它轉發來自其中一個輸入端口的所有值,並在任意源完成時終止。

object StopperFlow { 

    private class StopperMergeShape[A](_init: Init[A] = Name("StopperFlow")) extends FanInShape[A](_init) { 
    val in = newInlet[A]("in") 
    val stopper = newInlet[Unit]("stopper") 

    override protected def construct(init: Init[A]): FanInShape[A] = new StopperMergeShape[A](init) 
    } 

    private class StopperMerge[In] extends FlexiMerge[In, StopperMergeShape[In]](
    new StopperMergeShape(), Attributes.name("StopperMerge")) { 
    import FlexiMerge._ 

    override def createMergeLogic(p: PortT) = new MergeLogic[In] { 
     override def initialState = 
     State[In](Read(p.in)) { (ctx, input, element) => 
      ctx.emit(element) 
      SameState 
     } 

     override def initialCompletionHandling = eagerClose 
    } 
    } 

    def apply[In](): Flow[In, In, Promise[Unit]] = { 
    val stopperSource = Source.lazyEmpty[Unit] 

    Flow(stopperSource) { implicit builder => 
     stopper => 
     val stopperMerge = builder.add(new StopperMerge[In]()) 

     stopper ~> stopperMerge.stopper 

     (stopperMerge.in, stopperMerge.out) 
    } 
    }  
} 

該流可以插入任何流中。實現時,它將返回一個Promise,完成後終止流。這是我的測試。

implicit val system = ActorSystem() 
implicit val materializer = ActorMaterializer() 

val startTime = System.currentTimeMillis() 

def dumpToConsole(f: Float) = { 
    val timeSinceStart = System.currentTimeMillis() - startTime 
    System.out.println(s"[$timeSinceStart] - Random number: $f") 
} 

val randomSource = Source(() => Iterator.continually(Random.nextFloat())) 
val consoleSink = Sink.foreach(dumpToConsole) 
val flow = randomSource.viaMat(StopperFlow())(Keep.both).to(consoleSink) 

val (_, promise) = flow.run() 

Thread.sleep(1000) 
val _ = promise.success(()) 
Thread.sleep(1000) 

我希望這對其他人也有用。仍然讓我感到困惑的是,爲什麼沒有內建的流式終端流。

+1

這是非常有用的。可能是一個圖書館或在阿卡流中。 –

+1

+1我有一個情況,我正在演員中運行一個流。我想在演員停止時取消流,但是,似乎並沒有內置的方式來做到這一點。 – Mullefa

+4

注意比akka 2.4.3還有KillSwitch流操作符,它允許外部完成流。 – Readren

0

不完全停止,但限制。您可以使用limittake

Streams Cookbook

例子:

val MAX_ALLOWED_SIZE = 100 

// OK. Future will fail with a `StreamLimitReachedException` 
// if the number of incoming elements is larger than max 
val limited: Future[Seq[String]] = 
    mySource.limit(MAX_ALLOWED_SIZE).runWith(Sink.seq) 

// OK. Collect up until max-th elements only, then cancel upstream 
val ignoreOverflow: Future[Seq[String]] = 
    mySource.take(MAX_ALLOWED_SIZE).runWith(Sink.seq)