2016-02-05 113 views
0

我有以下情況,我試圖將處理委託給角色。我想要發生的是每當我的流程處理消息時,它就會將它發送給演員,演員會將其大寫,並將其作爲響應寫入流中。Akka流和委託處理給演員

所以我應該能夠連接到端口8000,輸入「hello」,讓流發送給演員,並讓演員將其發回給流,以便它回傳給我uppercased。演員本身非常基本,來自文檔中的ActorPublisher示例。

我知道這段代碼不起作用,我清理了我的實驗以便編譯它。現在,它只是兩個獨立的流。我試圖合併源或匯,但無濟於事。

object Sample { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("sample") 
    implicit val materializer = ActorMaterializer() 

    val connections: Source[IncomingConnection, 
     Future[ServerBinding]] = Tcp().bind("localhost", 8000) 
    val filter = Source.actorPublisher[ByteString](Props[Filter]) 

    val filterRef = Flow[ByteString] 
     .to(Sink.ignore) 
     .runWith(filter) 

    connections runForeach { conn => 
     val echo = Flow[ByteString] .map { 


     // would like to send 'p' to the actor, 
     // and have it publish to the stream 
     case p:ByteString => filterRef ! p 
     } 
    } 
    } 
} 

// this actor is supposed to simply uppercase all 
// input and write it to the stream 
class Filter extends ActorPublisher[ByteString] with Actor 
{ 
    var buf = Vector.empty[ByteString] 
    val delay = 0 

    def receive = { 
    case p: ByteString => 
     if (buf.isEmpty && totalDemand > 0) 
     onNext(p) 
     else { 
     buf :+= ByteString(p.utf8String.toUpperCase) 
     deliverBuf() 
     } 
    case Request(_) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 


    @tailrec final def deliverBuf(): Unit = 
    if (totalDemand > 0) { 
     if (totalDemand <= Int.MaxValue) { 
     val (use, keep) = buf.splitAt(totalDemand.toInt) 
     buf = keep 
     use foreach onNext 
     } else { 
     val (use, keep) = buf.splitAt(Int.MaxValue) 
     buf = keep 
     use foreach onNext 
     deliverBuf() 
     } 
    } 
} 
+0

如果你需要一個定製的演員身份參與的一部分' Flow',爲什麼不讓它成爲一個普通的'Actor'(不是發佈者或訂閱者),然後通過'ask'和'mapAsync'集成到'Flow'中?代碼非常簡單,您可以將反應流背壓處理代碼委託給框架,讓您的演員專注於做它需要做的事情。 – cmbaxter

+0

我沒有在原始文章中指定,因爲我不想降低問題的精神,但在某些情況下,我希望演員正在處理不同的流,但能夠發佈到連接流。在這種情況下,如果目標(非源)流沒有任何活動,我將無法使用「詢問」。 –

回答

1

我以前也有過這個問題,有點迂迴的解決方法,希望你用這種方式可以。從本質上講,它涉及到創建一個接收器,該接收器立即將其獲取的消息轉發給src actor。

當然,你也可以使用直接流(註釋出來),但我想這不是這個練習:)點

object Sample { 
    def main(args: Array[String]): Unit = { 
    implicit val system = ActorSystem("sample") 
    implicit val materializer = ActorMaterializer() 

    val connections: Source[IncomingConnection, 
     Future[ServerBinding]] = Tcp().bind("localhost", 8000) 

    def filterProps = Props[Filter] 

    connections runForeach { conn => 
     val actorRef = system.actorOf(filterProps) 
     val snk = Sink.foreach[ByteString]{s => actorRef ! s} 
     val src = Source.fromPublisher(ActorPublisher[ByteString](actorRef)) 
     conn.handleWith(Flow.fromSinkAndSource(snk, src)) 

//  conn.handleWith(Flow[ByteString].map(s => ByteString(s.utf8String.toUpperCase()))) 
    } 
    } 
} 

// this actor is supposed to simply uppercase all 
// input and write it to the stream 
class Filter extends ActorPublisher[ByteString] 
{ 
    import akka.stream.actor.ActorPublisherMessage._ 
    var buf = mutable.Queue.empty[String] 
    val delay = 0 

    def receive = { 
    case p: ByteString => 
     buf += p.utf8String.toUpperCase 
     deliverBuf() 
    case Request(n) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 

    def deliverBuf(): Unit = { 
    while (totalDemand > 0 && buf.nonEmpty) { 
     val s = ByteString(buf.dequeue() + "\n") 
     onNext(s) 
    } 
    } 
+0

謝謝,這個工程。如果演員有不同的來源,但是我仍然想要輸出到連接,您是否看到任何理由不起作用?例如,演員的來源是一個內部事件泵,當事件發生時,將其發佈到連接。 –

+0

沒有理由不應該。只要有需求,Filter actor可以生成自己的數據並在需要時發佈它。我在我的代碼中這樣做。這是我不得不使用ActorPublisher的主要原因。 –

+0

謝謝。使用這種語法,其中Flow是從source和sink動態創建的,是否有辦法將控制權交還給處理連接的原始流?我迷失在這個溪流中。如何將Flow.fromSinkAndSource()連接到下一個流程? –