我有以下情況,我試圖將處理委託給角色。我想要發生的是每當我的流程處理消息時,它就會將它發送給演員,演員會將其大寫,並將其作爲響應寫入流中。Akka流和委託處理給演員
所以我應該能夠連接到端口8000,輸入「hello」,讓流發送給演員,並讓演員將其發回給流,以便它回傳給我uppercased。演員本身非常基本,來自文檔中的ActorPublisher示例。
我知道這段代碼不起作用,我清理了我的實驗以便編譯它。現在,它只是兩個獨立的流。我試圖合併源或匯,但無濟於事。
object Sample {
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("sample")
implicit val materializer = ActorMaterializer()
val connections: Source[IncomingConnection,
Future[ServerBinding]] = Tcp().bind("localhost", 8000)
val filter = Source.actorPublisher[ByteString](Props[Filter])
val filterRef = Flow[ByteString]
.to(Sink.ignore)
.runWith(filter)
connections runForeach { conn =>
val echo = Flow[ByteString] .map {
// would like to send 'p' to the actor,
// and have it publish to the stream
case p:ByteString => filterRef ! p
}
}
}
}
// this actor is supposed to simply uppercase all
// input and write it to the stream
class Filter extends ActorPublisher[ByteString] with Actor
{
var buf = Vector.empty[ByteString]
val delay = 0
def receive = {
case p: ByteString =>
if (buf.isEmpty && totalDemand > 0)
onNext(p)
else {
buf :+= ByteString(p.utf8String.toUpperCase)
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
如果你需要一個定製的演員身份參與的一部分' Flow',爲什麼不讓它成爲一個普通的'Actor'(不是發佈者或訂閱者),然後通過'ask'和'mapAsync'集成到'Flow'中?代碼非常簡單,您可以將反應流背壓處理代碼委託給框架,讓您的演員專注於做它需要做的事情。 – cmbaxter
我沒有在原始文章中指定,因爲我不想降低問題的精神,但在某些情況下,我希望演員正在處理不同的流,但能夠發佈到連接流。在這種情況下,如果目標(非源)流沒有任何活動,我將無法使用「詢問」。 –