2017-07-14 51 views
1

我正在使用一些示例Scala代碼來使服務器通過websocket接收文件,臨時存儲該文件,在其上運行bash腳本,然後通過TextMessage返回stdout。使用akka-http websockets上傳和處理文件

示例代碼取自this github project

我在echoService中稍微編輯了代碼,以便它運行另一個處理臨時文件的函數。

object WebServer { 
    def main(args: Array[String]) { 

    implicit val actorSystem = ActorSystem("akka-system") 
    implicit val flowMaterializer = ActorMaterializer() 

    val interface = "localhost" 
    val port = 3000 

    import Directives._ 

    val route = get { 
     pathEndOrSingleSlash { 
     complete("Welcome to websocket server") 
     } 
    } ~ 
     path("upload") { 
     handleWebSocketMessages(echoService) 
     } 

     val binding = Http().bindAndHandle(route, interface, port) 
     println(s"Server is now online at http://$interface:$port\nPress RETURN to stop...") 
     StdIn.readLine() 

     binding.flatMap(_.unbind()).onComplete(_ => actorSystem.shutdown()) 
     println("Server is down...") 

    } 

    implicit val actorSystem = ActorSystem("akka-system") 
    implicit val flowMaterializer = ActorMaterializer() 


    val echoService: Flow[Message, Message, _] = Flow[Message].mapConcat { 

     case BinaryMessage.Strict(msg) => { 
     val decoded: Array[Byte] = msg.toArray 
     val imgOutFile = new File("/tmp/" + "filename") 
     val fileOuputStream = new FileOutputStream(imgOutFile) 
     fileOuputStream.write(decoded) 
     fileOuputStream.close() 
     TextMessage(analyze(imgOutFile)) 
     } 

     case BinaryMessage.Streamed(stream) => { 

     stream 
      .limit(Int.MaxValue) // Max frames we are willing to wait for 
      .completionTimeout(50 seconds) // Max time until last frame 
      .runFold(ByteString(""))(_ ++ _) // Merges the frames 
      .flatMap { (msg: ByteString) => 

      val decoded: Array[Byte] = msg.toArray 
      val imgOutFile = new File("/tmp/" + "filename") 
      val fileOuputStream = new FileOutputStream(imgOutFile) 
      fileOuputStream.write(decoded) 
      fileOuputStream.close() 
      Future(Source.single("")) 
     } 
     TextMessage(analyze(imgOutFile)) 
     } 


     private def analyze(imgfile: File): String = { 
     val p = Runtime.getRuntime.exec(Array("./run-vision.sh", imgfile.toString)) 
     val br = new BufferedReader(new InputStreamReader(p.getInputStream, StandardCharsets.UTF_8)) 
     try { 
      val result = Stream 
      .continually(br.readLine()) 
      .takeWhile(_ ne null) 
      .mkString 
      result 

     } finally { 
      br.close() 
     } 
     } 
    } 




} 

在測試過程中使用的WebSocket黑暗終端,case BinaryMessage.Strict工作正常。

問題:然而,case BinaryMessage.Streaming沒有完成運行analyze功能,導致來自服務器的空白響應之前寫入文件。

我正試圖圍繞Akka-HTTP中的Flow如何使用期貨,但我沒有嘗試通過所有官方文檔獲得很多運氣。

目前,.mapAsync看起來很有前途,或者基本上找到了連接未來的方法。

我真的很感謝一些見解。

回答

2

是的,mapAsync會在這個場合幫助你。它是一個組合器,可以在您的流中執行Future(可能並行),並在輸出端顯示它們的結果。

在您的情況下,爲了使事物變得均勻並使類型檢查程序開心,您需要將Strict案例的結果包裝爲Future.successful

要快速解決您的代碼可能是:

val echoService: Flow[Message, Message, _] = Flow[Message].mapAsync(parallelism = 5) { 

    case BinaryMessage.Strict(msg) => { 
     val decoded: Array[Byte] = msg.toArray 
     val imgOutFile = new File("/tmp/" + "filename") 
     val fileOuputStream = new FileOutputStream(imgOutFile) 
     fileOuputStream.write(decoded) 
     fileOuputStream.close() 
     Future.successful(TextMessage(analyze(imgOutFile))) 
    } 

    case BinaryMessage.Streamed(stream) => 

     stream 
     .limit(Int.MaxValue) // Max frames we are willing to wait for 
     .completionTimeout(50 seconds) // Max time until last frame 
     .runFold(ByteString(""))(_ ++ _) // Merges the frames 
     .flatMap { (msg: ByteString) => 

     val decoded: Array[Byte] = msg.toArray 
     val imgOutFile = new File("/tmp/" + "filename") 
     val fileOuputStream = new FileOutputStream(imgOutFile) 
     fileOuputStream.write(decoded) 
     fileOuputStream.close() 
     Future.successful(TextMessage(analyze(imgOutFile))) 
     } 
    } 
+0

真棒,謝謝。 您是否也知道爲什麼ActorSystem和ActorMaterializer必須在此代碼中初始化兩次? – frozbread

+1

我相信這是一個錯誤,沒有必要這樣做 –