當使用Akka Streams時,有什麼方法可以關閉/關閉不再需要資源清理的流嗎?關閉資源清理的Akka流
編輯:當源由無限流組成時,它可能永遠不會完成,我想在完成源之前停止它。
用法示例:
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run(materializer)
有沒有一種方法來關閉流?
當使用Akka Streams時,有什麼方法可以關閉/關閉不再需要資源清理的流嗎?關閉資源清理的Akka流
編輯:當源由無限流組成時,它可能永遠不會完成,我想在完成源之前停止它。
用法示例:
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run(materializer)
有沒有一種方法來關閉流?
您可以運行在一個獨立的ActorMaterializer
的Stream
和一段時間後調用shutdown的ActorMaterializer:
val actorSystem = ActorSystem()
val temporaryStream = {
val localMat = ActorMaterializer()(actorSystem)
import actorSystem.dispatcher
actorSystem.scheduler.scheduleOnce(10 minutes) { localMat.shutdown() }
Source.from(publisher)
.map((p) -> p)
.to(Sink.ignore())
.run()(localMat)
}
同樣,你可以返回,而不是ActorMaterializer基於物化流和關閉ActorMaterializer一些外部條件,而不是時間。
當上遊源不再有任何數據滿足下游需求時,流應自然停止。所以當發佈者向下遊發送完成時,這個物化流實例應該停止。或者,如果你有一個長時間運行的流程,你可以考慮使用一個單獨的物化器,然後在物化器上調用'shutdown()' – cmbaxter
由於Source是一個發佈者,它可以是一個無限流(在我的情況下來自卡夫卡的一條小河),它永遠不會完成。 – aseychell
通過'發佈者',你能從下游的'Flow'訪問'Subscription'嗎?這樣你就可以取消它了嗎?你的'發佈者'的impl類是什麼? – cmbaxter