2017-09-09 66 views
0

在akka-http websocket應用程序中,我有一條Route迴應給定的消息,並且我還需要在應用程序中維護狀態。所以下面的工作正常:使用statefulMapConcat返回源代碼

override protected def routes: Route = 
    pathSuffix("echo") { 
     handleWebSocketMessages(echoMessageFlow) 
    } 

    def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].statefulMapConcat {() => 
    implicit var state = new SessionState() 
    msg: Message => 
    List(msg, msg, msg) // echo the message back 3 times 
    } 

但是,我也需要扼殺回聲消息,所以只有一個每秒發生。所以我希望能夠做到這一點:

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].statefulMapConcat {() => 
    implicit var state = new SessionState() 
    msg: Message => 
    Source(List(msg, msg, msg)).throttle(1, 1 second, 1, ThrottleMode.shaping) 
    } 

然而,statefulMapConcat返回的功能要求,這是一個Iterable。有什麼方法可以返回Source嗎?

回答

1

您可以使用flatMapConcat(或flatMapMerge,如果您需要並行化)併爲其賦予每個傳入元素產生Source的函數。

每個產生的Source都可以通過追加throttle組合子來限制,就像你上面做的那樣。

最後,如果你想讓你的Source成爲有狀態,你可以使用Source.unfold來創建它。下面

實施例(使用產生的消息作爲內部狀態的計數):

def echoMessageFlow: Flow[Message, Message, NotUsed] = Flow[Message].flatMapConcat { msg: Message => 
    Source.unfold(0){ count: Int ⇒ 
     if (count < 3) 
     Some(count + 1, msg) 
     else 
     None 
    }.throttle(1, 1.second, 1, ThrottleMode.shaping) 
    } 
+0

這正是我一直在尋找。謝謝! –