2015-09-10 69 views
3

當使用Akka Streams時,有什麼方法可以關閉/關閉不再需要資源清理的流嗎?關閉資源清理的Akka流

編輯:當源由無限流組成時,它可能永遠不會完成,我想在完成源之前停止它。

用法示例:

Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run(materializer) 

有沒有一種方法來關閉流?

+0

當上遊源不再有任何數據滿足下游需求時,流應自然停止。所以當發佈者向下遊發送完成時,這個物化流實例應該停止。或者,如果你有一個長時間運行的流程,你可以考慮使用一個單獨的物化器,然後在物化器上調用'shutdown()' – cmbaxter

+0

由於Source是一個發佈者,它可以是一個無限流(在我的情況下來自卡夫卡的一條小河),它永遠不會完成。 – aseychell

+0

通過'發佈者',你能從下游的'Flow'訪問'Subscription'嗎?這樣你就可以取消它了嗎?你的'發佈者'的impl類是什麼? – cmbaxter

回答

4

您可以運行在一個獨立的ActorMaterializerStream和一段時間後調用shutdown的ActorMaterializer:

val actorSystem = ActorSystem() 

val temporaryStream = { 

    val localMat = ActorMaterializer()(actorSystem) 

    import actorSystem.dispatcher 
    actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() } 

    Source.from(publisher) 
     .map((p) -> p) 
     .to(Sink.ignore()) 
     .run()(localMat) 
} 

同樣,你可以返回,而不是ActorMaterializer基於物化流和關閉ActorMaterializer一些外部條件,而不是時間。

+0

我發現爲每個流創建一個新的actor系統效率相當低。在我的場景中,我使用akka流來訂閱Apache Kafka主題,如果我使用您的建議,這需要多個流並需要大量資源(每個主題都有一個線程池)。 – aseychell

+0

根據您的新需求查看我的更新建議... –

+0

我正在使用'Materializer'接口而不是'ActorMaterializer',並且未在接口上找到關閉方法。謝謝! – aseychell