我一直在玩Akka Streams API實驗,我有一個用例,我想看看如何實現。對於我的用例,我有一個StreamTcp
基於Flow
,它是通過將連接的輸入流綁定到我的服務器套接字來提供的。我擁有的Flow基於ByteString
數據。進入的數據將會有一個分隔符,這意味着我應該將分隔符之前的所有內容作爲一條消息進行處理,並將所有分隔符之後的所有內容作爲下一條消息進行處理。所以,玩弄一個簡單的例子,不使用插座只是靜態的文字,這是我想出了:如何使用Akka Streams在分隔符上分割入站流
import akka.actor.ActorSystem
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.scaladsl.Flow
import scala.util.{ Failure, Success }
import akka.util.ByteString
object BasicTransformation {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("Sys")
val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.")
Flow(data).
splitWhen(c => c == '.').
foreach{producer =>
Flow(producer).
filter(c => c != '.').
fold(new StringBuilder)((sb, c) => sb.append(c.toChar)).
map(_.toString).
filter(!_.isEmpty).
foreach(println(_)).
consume(FlowMaterializer(MaterializerSettings()))
}.
onComplete(FlowMaterializer(MaterializerSettings())) {
case any =>
system.shutdown
}
}
}
在Flow
主要功能,我發現來完成我的目標是splitWhen
,然後產生額外的子流程,每個符合.
定界符的每個消息。然後我用另一個步驟流程處理每個子流程,最後打印單個消息。
這一切似乎有點冗長,完成我認爲是一個非常簡單和常見的用例。所以我的問題是,是否有一個更清潔,更不詳細的方式來做到這一點,或者這是一個正確和首選的方式來分隔一個流的分隔符?
完美!這應該是被接受的答案。 另請注意,它將跨越大塊工作。與嘗試: 'VAL第一=字節串( 「Lorem存有是printing.And排版的simply.Dummy文本」)' 'VAL第二=字節串(」 industry.More text.delimited by.a週期。 「)' – 2017-04-04 16:46:53