2016-03-06 70 views
4

這裏是設置:我希望能夠通過tcp連接將消息(jsons轉換爲bytestrings)從發佈者傳輸到遠程服務器訂閱者。
理想情況下,發佈者將是一個可以接收內部消息的隊列,然後將它們排隊,然後將它們流式傳輸到訂閱服務器,當然如果有突出需求的話。我明白,這需要擴展ActorPublisher類,以便在需要時向onNext()發送消息。
我的問題是,到目前爲止,我只能發送(接收和解碼正確)一次性消息到服務器每次打開一個新的連接。我沒有設法繞過akka文檔,並能夠使用ActorPublisher設置正確的tcp Flow
下面是來自出版商的代碼:akka streaming over tcp

def send(message: Message): Unit = { 
    val system = Akka.system() 
    implicit val sys = system 

    import system.dispatcher 

    implicit val materializer = ActorMaterializer() 

    val address =  Play.current.configuration.getString("eventservice.location").getOrElse("localhost") 
    val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000) 

    /*** Try with actorPublisher ***/ 
    //val result = Source.actorPublisher[Message] (Props[EventActor]).via(Flow[Message].map(Json.toJson(_).toString.map(ByteString(_)))) 

    /*** Try with actorRef ***/ 
    /*val source = Source.actorRef[Message](0, OverflowStrategy.fail).map(
    m => { 
    Logger.info(s"Sending message: ${m.toString}") 
    ByteString(Json.toJson(m).toString) 
    } 
) 
    val ref = Flow[ByteString].via(Tcp().outgoingConnection(address, port)).to(Sink.ignore).runWith(source)*/ 

    val result = Source(Json.toJson(message).toString.map(ByteString(_))). 
    via(Tcp().outgoingConnection(address, port)). 
    runFold(ByteString.empty) { (acc, in) ⇒ acC++ in }//Handle the future 
} 

,並從演員的代碼到底是相當標準:

import akka.actor.Actor 
import akka.stream.actor.ActorSubscriberMessage.{OnComplete, OnError} 
import akka.stream.actor.{ActorPublisherMessage, ActorPublisher} 

import models.events.Message 

import play.api.Logger 

import scala.collection.mutable 

class EventActor extends Actor with ActorPublisher[Message] { 
    import ActorPublisherMessage._ 
    var queue: mutable.Queue[Message] = mutable.Queue.empty 

    def receive = { 
     case m: Message => 
     Logger.info(s"EventActor - message received and queued: ${m.toString}") 
     queue.enqueue(m) 
     publish() 

     case Request => publish() 

     case Cancel => 
      Logger.info("EventActor - cancel message received") 
      context.stop(self) 

     case OnError(err: Exception) => 
      Logger.info("EventActor - error message received") 
      onError(err) 
      context.stop(self) 

     case OnComplete => 
      Logger.info("EventActor - onComplete message received") 
      onComplete() 
      context.stop(self) 
    } 

    def publish() = { 
    while (queue.nonEmpty && isActive && totalDemand > 0) { 
    Logger.info("EventActor - message published") 
    onNext(queue.dequeue()) 
    } 
} 

我可以從用戶在必要時提供代碼:

def connect(system: ActorSystem, address: String, port: Int): Unit = { 
implicit val sys = system 
import system.dispatcher 
implicit val materializer = ActorMaterializer() 

val handler = Sink.foreach[Tcp.IncomingConnection] { conn => 
    Logger.info("Event server connected to: " + conn.remoteAddress) 
    // Get the ByteString flow and reconstruct the msg for handling and then output it back 
    // that is how handleWith work apparently 
    conn.handleWith(
    Flow[ByteString].fold(ByteString.empty)((acc, b) => acC++ b). 
     map(b => handleIncomingMessages(system, b.utf8String)). 
     map(ByteString(_)) 
) 
} 

val connections = Tcp().bind(address, port) 
val binding = connections.to(handler).run() 

binding.onComplete { 
    case Success(b) => 
    Logger.info("Event server started, listening on: " + b.localAddress) 
    case Failure(e) => 
    Logger.info(s"Event server could not bind to $address:$port: ${e.getMessage}") 
    system.terminate() 
} 
} 

在此先感謝提示。

+0

使用Akka中指定的雙向流,並特別使用它們的tcp示例。 http://doc.akka.io/docs/akka/current/scala/io-tcp.html –

回答

2

我的第一個建議是不寫你自己的隊列邏輯。 Akka提供了這個開箱即用的功能。你也不需要編寫你自己的Actor,Akka Streams也可以提供它。

首先,我們可以創建Flow,它將通過Tcp將您的發佈者連接到您的訂閱者。在發佈商的代碼,你只需要創建一次ActorSystem,並連接到外部服務器一次:

//this code is at top level of your application 

implicit val actorSystem = ActorSystem() 
implicit val actorMaterializer = ActorMaterializer() 
import actorSystem.dispatcher 

val host = Play.current.configuration.getString("eventservice.location").getOrElse("localhost") 
val port = Play.current.configuration.getInt("eventservice.port").getOrElse(9000) 

val publishFlow = Tcp().outgoingConnection(host, port) 

publishFlow是一個Flow,將輸入您要ByteString數據發送給外部用戶輸出字節字符串數據來自訂閱者:

// data to subscriber ----> publishFlow ----> data returned from subscriber 

下一步是發佈者Source。您可以使用Source.actorRef"materialize"將Stream編碼爲ActorRef,而不必編寫自己的Actor。本質上,流將成爲ActorRef爲我們以後使用:

//these values control the buffer 
val bufferSize = 1024 
val overflowStrategy = akka.stream.OverflowStrategy.dropHead 

val messageSource = Source.actorRef[Message](bufferSize, overflowStrategy) 

我們還需要一個流程,以信息轉換爲字節串

val marshalFlow = 
    Flow[Message].map(message => ByteString(Json.toJson(message).toString)) 

最後,我們可以連接所有的作品。既然你是不是從外部用戶接收到任何數據前,我們會忽略任何數據從連接進來:

val subscriberRef : ActorRef = messageSource.via(marshalFlow) 
              .via(publishFlow) 
              .runWith(Sink.ignore)  

現在,我們可以把這個流,就好像它是一個演員:

val message1 : Message = ??? 

subscriberRef ! message1 

val message2 : Message = ??? 

subscriberRef ! message2