2017-07-19 79 views
3

我對AKKA比較陌生,所以想要學習基礎知識。在AKKA-Streams中使用動態水槽目的地?

我的用例是不斷從JMS隊列讀取消息並將每個消息輸出到新文件。

我已經得到了基本設置與工作:

Source<String, NotUsed> jmsSource = JmsSource 
.textSource(JmsSourceSettings 
    .create(connectionFactory) 
    .withQueue("myQueue") 
    .withBufferSize(10)); 

Sink<ByteString, CompletionStage<IOResult>> fileSink = FileIO.toFile(new 
File("random.txt")); 

final Flow<String, ByteString, NotUsed> flow = Flow.fromFunction((String n) -> ByteString.fromString(n)); 

final RunnableGraph<NotUsed> runnable = jmsSource.via(flow).to(fileSink); 

runnable.run(materializer); 

但我想要的文件名是動態的(而不是硬編碼「random.txt」),它應該取決於被改變隊列中每條消息的內容(我當然可以在隊列中選擇,但是如何在fileSink中設置該名稱?)。如何最好地設置?

+0

每當我回到阿卡流,我似乎總是想要這個。我一旦過去就解決了它,但我不記得如何。我想我從Akka Http獲得了一些啓發。 – Steiny

+0

您是否曾經能夠在此找出解決方案? – Scalahansolo

+1

可悲的是。我很驚訝沒有簡單的方法來做這件事,因爲它似乎是這樣一個基本特徵。我幾乎可以肯定,有一些我錯過了......但對這個問題的迴應表明我可能不得不爲此編寫一些自定義處理,並且幾乎告訴我這不是這種類型使用的正確實現 - 案件(希望我錯了)。 –

回答

2

我根據akka.stream.impl.LazySink創建了一個簡單的接收器。我只在成功案例中使用單個元素對其進行了測試,因此請隨時對此處或GitHub Gist發表評論。

import akka.NotUsed 
import akka.stream.{Attributes, Inlet, SinkShape} 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.stage._ 

class OneToOneOnDemandSink[T, +M](sink: T => Sink[T, M]) extends GraphStage[SinkShape[T]] { 

    val in: Inlet[T] = Inlet("OneToOneOnDemandSink.in") 
    override val shape = SinkShape(in) 

    override def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) { 

    override def preStart(): Unit = pull(in) 

    val awaitingElementHandler = new InHandler { 
     override def onPush(): Unit = { 
     val element = grab(in) 
     val innerSource = createInnerSource(element) 
     val innerSink = sink(element) 
     Source.fromGraph(innerSource.source).runWith(innerSink)(subFusingMaterializer) 
     } 

     override def onUpstreamFinish(): Unit = completeStage() 

     override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) 
    } 
    setHandler(in, awaitingElementHandler) 

    def createInnerSource(element: T): SubSourceOutlet[T] = { 
     val innerSource = new SubSourceOutlet[T]("OneToOneOnDemandSink.innerSource") 

     innerSource.setHandler(new OutHandler { 
     override def onPull(): Unit = { 
      innerSource.push(element) 
      innerSource.complete() 
      if (isClosed(in)) { 
      completeStage() 
      } else { 
      pull(in) 
      setHandler(in, awaitingElementHandler) 
      } 
     } 

     override def onDownstreamFinish(): Unit = { 
      innerSource.complete() 
      if (isClosed(in)) { 
      completeStage() 
      } 
     } 
     }) 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val illegalStateException = new IllegalStateException("Got a push that we weren't expecting") 
      innerSource.fail(illegalStateException) 
      failStage(illegalStateException) 
     } 

     override def onUpstreamFinish(): Unit = { 
      // We don't stop until the inner stream stops. 
      setKeepGoing(true) 
     } 

     override def onUpstreamFailure(ex: Throwable): Unit = { 
      innerSource.fail(ex) 
      failStage(ex) 
     } 
     }) 

     innerSource 
    } 
    } 
} 

object OneToOneOnDemandSink { 

    def apply[T, M](sink: T => Sink[T, M]): Sink[T, NotUsed] = Sink.fromGraph(new OneToOneOnDemandSink(sink)) 
} 

所以它避免了一大堆複雜的是LazySink已經並且也沒有合理的物化價值迴歸這將創建一個爲每個元素的新下沉。

0

PartitionHub應服務器您的目的。作爲動態的,您可以按需創建並附加新的接收器。