2016-06-21 69 views
4

什麼可以替代動態更改運行圖?這是我的情況。我有圖表,將文章吸收到DB中。文章來自3種不同格式的插件。因此,我有幾個流動如何動態地將Source添加到現有Graph?

val converterFlow1: Flow[ImpArticle, Article, NotUsed] 
val converterFlow2: Flow[NewsArticle, Article, NotUsed] 
val sinkDB: Sink[Article, Future[Done]] 

// These are being created every time I poll plugins  
val sourceContentProvider : Source[ImpArticle, NotUsed] 
val sourceNews : Source[NewsArticle, NotUsed] 
val sourceCit : Source[Article, NotUsed] 

val merged = Source.combine(
    sourceContentProvider.via(converterFlow1), 
    sourceNews.via(converterFlow2), 
    sourceCit)(Merge(_)) 

val res = merged 
    .buffer(10, OverflowStrategy.backpressure) 
    .toMat(sinkDB)(Keep.both) 
    .run() 

問題是,我從內容提供商處獲取一次數據每24個小時,從新聞每2個小時和最後一次來源可能出現在任何時間,因爲它是從人類的未來。

我意識到圖是不可改變的,但我如何定期將新實例Source附加到我的圖中,這樣我就可以單點控制攝取過程了嗎?

更新:你可以說我的數據是流的Source -s,在我的情況下有三個來源。但我無法改變這種情況,因爲我從外部類(所謂的插件)中獲得了Source的實例。這些插件獨立於我的攝取類別工作。我不能把他們合併成一個巨大的班級,只有一個Source

+0

這並不完全清楚你爲什麼需要附加新的來源。你說你有內容提供者的數據,從新聞和手動輸入的東西;因此,你有三個來源,不多也不少。所以你的代碼對我來說看起來很好。 –

+0

我在任意時間定期獲取需要攝入的'Source'類的新實例。因此,當我從內容提供商處獲取10K條目時,我想避免出現這種情況,並且在它的中間,我從包含2K條目的新聞中獲得「來源」。我希望他們同時攝入並尊重我的單一節流規則。 – expert

+0

我建議您將數據流建模爲不是「源」序列,​​而是作爲單個「源」來生成所有數據。那麼'合併'組合器就足夠了。我不確定你的設計如何避免你描述的情況,順便說一句。 –

回答

2

好的,一般來說,正確的方法是將源流加入單一來源,即從Source[Source[T, _], Whatever]Source[T, Whatever]。這可以通過flatMapConcatflatMapMerge完成。因此,如果您可以獲得Source[Source[Article, NotUsed], NotUsed],則可以使用flatMap*變體之一併獲得最終的Source[Article, NotUsed]。爲你的每一個來源(沒有雙關語)做,然後你的原始方法應該工作。

1

如果你不能將它建模爲Source[Source[_,_],_]那麼我會考慮使用Source.queue[Source[T,_]](queueSize, overflowStrategy)here

你有什麼要小心,雖然是如果提交失敗,會發生什麼情況。

+0

維克多,你能幫忙http://stackoverflow.com/q/38033362/226895? – expert

1

我已經實現了基於Vladimir Matveev給出的答案的代碼,並希望與其他人分享它,因爲它看起來像我常見的用例。

我知道Viktor Klang提到的Source.queue,但我不知道flatMapConcat。這是純粹的迷人。

implicit val system = ActorSystem("root") 
implicit val executor = system.dispatcher 
implicit val materializer = ActorMaterializer() 

case class ImpArticle(text: String) 
case class NewsArticle(text: String) 
case class Article(text: String) 

val converterFlow1: Flow[ImpArticle, Article, NotUsed] = Flow[ImpArticle].map(a => Article("a:" + a.text)) 
val converterFlow2: Flow[NewsArticle, Article, NotUsed] = Flow[NewsArticle].map(a => Article("a:" + a.text)) 
val sinkDB: Sink[Article, Future[Done]] = Sink.foreach { a => 
    Thread.sleep(1000) 
    println(a) 
} 

// These are being created every time I poll plugins 
val sourceContentProvider: Source[ImpArticle, NotUsed] = Source(List(ImpArticle("cp1"), ImpArticle("cp2"))) 
val sourceNews: Source[NewsArticle, NotUsed] = Source(List(NewsArticle("news1"), NewsArticle("news2"))) 
val sourceCit: Source[Article, NotUsed] = Source(List(Article("a1"), Article("a2"))) 

val (queue, completionFut) = Source 
    .queue[Source[Article, NotUsed]](10, backpressure) 
    .flatMapConcat(identity) 
    .buffer(2, OverflowStrategy.backpressure) 
    .toMat(sinkDB)(Keep.both) 
    .run() 

queue.offer(sourceContentProvider.via(converterFlow1)) 
queue.offer(sourceNews.via(converterFlow2)) 
queue.offer(sourceCit) 
queue.complete() 

completionFut.onComplete { 
    case Success(res) => 
    println(res) 
    system.terminate() 
    case Failure(ex) => 
    ex.printStackTrace() 
    system.terminate() 
} 

Await.result(system.whenTerminated, Duration.Inf) 

我還是檢查由queue.offer但在我的情況下,這些電話將是相當罕見的退換Future成功。

相關問題