什麼可以替代動態更改運行圖?這是我的情況。我有圖表,將文章吸收到DB中。文章來自3種不同格式的插件。因此,我有幾個流動如何動態地將Source添加到現有Graph?
val converterFlow1: Flow[ImpArticle, Article, NotUsed]
val converterFlow2: Flow[NewsArticle, Article, NotUsed]
val sinkDB: Sink[Article, Future[Done]]
// These are being created every time I poll plugins
val sourceContentProvider : Source[ImpArticle, NotUsed]
val sourceNews : Source[NewsArticle, NotUsed]
val sourceCit : Source[Article, NotUsed]
val merged = Source.combine(
sourceContentProvider.via(converterFlow1),
sourceNews.via(converterFlow2),
sourceCit)(Merge(_))
val res = merged
.buffer(10, OverflowStrategy.backpressure)
.toMat(sinkDB)(Keep.both)
.run()
問題是,我從內容提供商處獲取一次數據每24個小時,從新聞每2個小時和最後一次來源可能出現在任何時間,因爲它是從人類的未來。
我意識到圖是不可改變的,但我如何定期將新實例Source
附加到我的圖中,這樣我就可以單點控制攝取過程了嗎?
更新:你可以說我的數據是流的Source
-s,在我的情況下有三個來源。但我無法改變這種情況,因爲我從外部類(所謂的插件)中獲得了Source
的實例。這些插件獨立於我的攝取類別工作。我不能把他們合併成一個巨大的班級,只有一個Source
。
這並不完全清楚你爲什麼需要附加新的來源。你說你有內容提供者的數據,從新聞和手動輸入的東西;因此,你有三個來源,不多也不少。所以你的代碼對我來說看起來很好。 –
我在任意時間定期獲取需要攝入的'Source'類的新實例。因此,當我從內容提供商處獲取10K條目時,我想避免出現這種情況,並且在它的中間,我從包含2K條目的新聞中獲得「來源」。我希望他們同時攝入並尊重我的單一節流規則。 – expert
我建議您將數據流建模爲不是「源」序列,而是作爲單個「源」來生成所有數據。那麼'合併'組合器就足夠了。我不確定你的設計如何避免你描述的情況,順便說一句。 –