2015-05-19 110 views
2

我正在想出一個解決方案,將我收到的串流分成多個Strings。我一直在研究,看起來在Akka-Streams的早期版本中,有一個Transformer類,你可以擴展來做這種類型的轉換。在Akka-Streams中分流內流

在我使用的版本(RC2)中有Stage s,但我不確定如何實現分割模式。

Source.actorPublisher[String](MyActor.props). 
.XXXXX(_.split("\n")) 
.map(...) 
.to(Sink(...)) 

我要找的XXXXX組件,讓我輸入一個String並返回String序列,將發出每一個到流的其餘部分。

+2

您可以如用'mapConcat'結果元素總是僅依賴於單個輸入元素。如果依賴關係更復雜,則可以使用(有狀態)階段。 – jrudolph

+2

除此之外,通常'mapConcat'可以用'flatMap'來考慮。名字不同,因爲一些單子法不會成立。 – almendar

回答

3

我同意@jrudolph認爲mapConcat可能是你要找的。一個簡單的例子示出了在操作此方法:

val strings = List(
    """hello 
    world 
    test 
    this""", 
    """foo 
    bar 
    baz 
    """ 

) 

    implicit val system = ActorSystem("test") 
    implicit val mater = ActorFlowMaterializer() 
    Source(strings). 
    mapConcat(_.split("\n").map(_.trim).toList). 
    runForeach(println) 

如果運行該代碼你將看到以下打印出來:

hello 
world 
test 
this 
foo  
bar 
baz 
+0

這就是我一直在尋找的!謝謝! – hveiga

1

阿卡提供Framing輔助函數用於這種類型的問題。

假設你的字符集是UTF-8,你可以寫一個函數,在分隔String值的最大尺寸,並返回一個Flow可以進行拆分:

import akka.stream.scaladsl.Framing 
import akka.util.ByteString 

val newLineSplitter : (Int) => Flow[String, String, NotUsed] = 
    (maxLineSize) => 
    Flow[String] 
     .map(ByteString.apply) 
     .via(Framing delimiter (ByteString("\n"), maxLineSize)) 
     .via(Flow[ByteString] map (_.utf8String))