2014-09-02 83 views
10

我一直在玩Akka Streams API實驗,我有一個用例,我想看看如何實現。對於我的用例,我有一個StreamTcp基於Flow,它是通過將連接的輸入流綁定到我的服務器套接字來提供的。我擁有的Flow基於ByteString數據。進入的數據將會有一個分隔符,這意味着我應該將分隔符之前的所有內容作爲一條消息進行處理,並將所有分隔符之後的所有內容作爲下一條消息進行處理。所以,玩弄一個簡單的例子,不使用插座只是靜態的文字,這是我想出了:如何使用Akka Streams在分隔符上分割入站流

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys") 

    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    Flow(data). 
     splitWhen(c => c == '.'). 
     foreach{producer => 
     Flow(producer). 
      filter(c => c != '.'). 
      fold(new StringBuilder)((sb, c) => sb.append(c.toChar)). 
      map(_.toString). 
      filter(!_.isEmpty). 
      foreach(println(_)). 
      consume(FlowMaterializer(MaterializerSettings())) 
     }. 
     onComplete(FlowMaterializer(MaterializerSettings())) { 
     case any => 
      system.shutdown 
     } 
    } 
} 

Flow主要功能,我發現來完成我的目標是splitWhen,然後產生額外的子流程,每個符合.定界符的每個消息。然後我用另一個步驟流程處理每個子流程,最後打印單個消息。

這一切似乎有點冗長,完成我認爲是一個非常簡單和常見的用例。所以我的問題是,是否有一個更清潔,更不詳細的方式來做到這一點,或者這是一個正確和首選的方式來分隔一個流的分隔符?

回答

1

在Akka用戶組發佈相同的問題後,我從Endre Varga和Viktor Klang(https://groups.google.com/forum/#!topic/akka-user/YsnwIAjQ3EE)得到了一些建議。我結束了與Endre的Transformer的建議,然後在Flow上使用transform方法。我前面的例子稍加修改的版本載於下文:

import akka.actor.ActorSystem 
import akka.stream.{ FlowMaterializer, MaterializerSettings } 
import akka.stream.scaladsl.Flow 
import scala.util.{ Failure, Success } 
import akka.util.ByteString 
import akka.stream.Transformer 
import akka.util.ByteStringBuilder 

object BasicTransformation { 

    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("Sys")       
    implicit val mater = FlowMaterializer(MaterializerSettings()) 

    val data = List(
     ByteString("Lorem Ipsum is"), 
     ByteString(" simply.Dummy text of.The prin"), 
     ByteString("ting.And typesetting industry.") 
    ) 
    Flow(data).transform(new PeriodDelimitedTransformer).foreach(println(_)) 
    } 
} 

隨着PeriodDelimitedTransformer定義爲以下幾點:

class PeriodDelimitedTransformer extends Transformer[ByteString,String]{ 
    val buffer = new ByteStringBuilder 

    def onNext(msg:ByteString) = {  
    val msgString = msg.utf8String 
    val delimIndex = msgString.indexOf('.') 
    if (delimIndex == -1){ 
     buffer.append(msg) 
     List.empty 
    } 
    else{ 
     val parts = msgString.split("\\.") 
     val endsWithDelim = msgString.endsWith(".") 

     buffer.putBytes(parts.head.getBytes()) 
     val currentPiece = buffer.result.utf8String    
     val otherPieces = parts.tail.dropRight(1).toList 

     buffer.clear 
     val lastPart = 
     if (endsWithDelim){ 
      List(parts.last) 
     } 
     else{ 
      buffer.putBytes(parts.last.getBytes()) 
      List.empty 
     }   


     val result = currentPiece :: otherPieces ::: lastPart 
     result 
    } 

    } 
} 

所以一些我以前的解決方案的複雜性被捲成這個Transformer ,但這似乎是最好的方法。在我最初的解決方案中,流最終分裂成多個子流,這不是我想要的。

1

有一些示例代碼在的akka​​流文檔中現在發佈在Streams Cookbook中。

10

它看起來像API最近改進,包括akka.stream.scaladsl.Framing。該文檔還包含如何使用它的example。關於你的具體問題:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Framing, Source} 
import akka.util.ByteString 
import com.typesafe.config.ConfigFactory 

object TcpDelimiterBasedMessaging extends App { 
    object chunks { 
    val first = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 
    val second = ByteString("More text.delimited by.a period.") 
    } 

    implicit val system = ActorSystem("delimiter-based-messaging", ConfigFactory.defaultReference()) 
    implicit val dispatcher = system.dispatcher 
    implicit val materializer = ActorMaterializer() 

    Source(chunks.first :: chunks.second :: Nil) 
    .via(Framing.delimiter(ByteString("."), Int.MaxValue)) 
    .map(_.utf8String) 
    .runForeach(println) 
    .onComplete(_ => system.terminate()) 
} 

產生以下輸出: Lorem Ipsum is simply Dummy text of the printing And typesetting industry More text delimited by a period

+0

完美!這應該是被接受的答案。 另請注意,它將跨越大塊工作。與嘗試: 'VAL第一=字節串( 「Lorem存有是printing.And排版的simply.Dummy文本」)' 'VAL第二=字節串(」 industry.More text.delimited by.a週期。 「)' – 2017-04-04 16:46:53

0

我覺得安德烈的使用Framing是你的問題的最佳解決方案,但我也有類似的問題,發現Framing是太有限了。我使用了statefulMapConcat,它允許您使用您喜歡的任何規則將輸入的ByteString分組。下面的代碼對你的問題的情況下,它可以幫助任何人:

import akka.actor.ActorSystem 
import akka.stream.ActorMaterializer 
import akka.stream.scaladsl.{Flow, Source} 
import akka.util.ByteString 

object BasicTransformation extends App { 

    implicit val system = ActorSystem("Sys") 
    implicit val materializer = ActorMaterializer() 
    implicit val dispatcher = system.dispatcher 
    val data = ByteString("Lorem Ipsum is simply.Dummy text of the printing.And typesetting industry.") 

    val grouping = Flow[Byte].statefulMapConcat {() => 
    var bytes = ByteString() 
    byt => 
     if (byt == '.') { 
     val string = bytes.utf8String 
     bytes = ByteString() 
     List(string) 
     } else { 
     bytes :+= byt 
     Nil 
     } 
    } 

    Source(data).via(grouping).runForeach(println).onComplete(_ => system.terminate()) 
} 

主要生產: Lorem Ipsum is simply Dummy text of the printing And typesetting industry