我的同事和我一直困惑於DistributedPubSubMediator直接或通過代理Actor訂閱/取消訂閱的不同行爲。我們彙總了一個測試以顯示下面的不同結果。DistributedPubSubMediator通過代理訂閱Actor不起作用
根據我們的理解,ActorRef.forward應該在原始發件人中傳遞,因此無論該郵件是直接發送給中介還是通過代理Actor都無關緊要。 IE瀏覽器。 http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef。
要變通,我們必須擴展DIstributedPubSubMediator類幷包含DistributedPubSubMediator對象已提供的邏輯。理想情況下,我們希望直接使用對象並還原代碼。
這看起來像一個錯誤。有誰知道這種不尋常行爲的根本原因?請幫助...
[2013年10月22日]根據Roland的答案更新測試(謝謝),並在SubscriberAck和UnsubscribeAck上添加expectMsgType。我們現在收到SubscribeAck,但奇怪的不是UnSubscribeAck。這不是一個重大問題,但我們想知道爲什麼。
另一個問題,如果我們可能會問一個問題,是通過在同一個ActorSystem中運行的代理Actor將遠程參與者訂閱到DistributedPubSubMediator是否是一種好的做法?
目前,我們有:
- 訂閱應用程序發現發佈應用程序(在非阿卡路),並獲得簇地址。
- 遠程用戶使用此地址和已知代理actor的路徑發送身份請求。
- 遠程用戶獲取ActorIdentity響應,然後通過此(遠程)代理訂閱/取消訂閱。
- 在發佈者應用程序訂閱/取消訂閱消息被轉發到DistributedPubSubMediator,它用於發佈後續業務消息。
根據Akka Reactor pubsub聊天客戶端示例(即僅使用DistributedPubSubMediator發佈),我們沒有加入羣集,因爲我們需要處理髮布者端的故障轉移。
[2013年11月5日]增加了對發送消息的測試。它似乎沒有工作,我們還沒有弄清楚。
package star.common.pubsub
import org.scalatest.{BeforeAndAfterAll, FunSuite}
import org.junit.runner.RunWith
import akka.contrib.pattern.DistributedPubSubExtension
import akka.contrib.pattern.DistributedPubSubMediator._
import akka.testkit.TestKit
import akka.actor.{Actor, ActorSystem, ActorRef, Props}
import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory
object MediatorTest {
val config = ConfigFactory.parseString(s"""
akka.actor.provider="akka.cluster.ClusterActorRefProvider"
akka.remote.netty.tcp.port=0
akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"]
""")
}
@RunWith(classOf[org.scalatest.junit.JUnitRunner])
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite {
val mediator = DistributedPubSubExtension(system).mediator
val topic = "example"
val message = "Published Message"
// val joinAddress = Cluster(system).selfAddress
// Cluster(system).join(joinAddress)
test("Direct subscribe to mediator") {
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator.!(Unsubscribe(topic, testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
mediator ! Publish(topic, message)
expectNoMsg(2 seconds)
}
test("Subscribe to mediator via proxy") {
class Proxy extends Actor {
override def receive = {
case subscribe: Subscribe =>
mediator forward subscribe
case unsubscribe: Unsubscribe =>
mediator forward unsubscribe
case publish: Publish =>
mediator.!(publish)
}
}
val proxy = system.actorOf(Props(new Proxy), "proxy")
proxy.!(Subscribe(topic,testActor))(testActor)
expectMsgType[SubscribeAck](2 seconds)
proxy ! Publish(topic, message)
expectMsg(5 seconds, message)
proxy.!(Unsubscribe(topic,testActor))(testActor)
expectMsgType[UnsubscribeAck](5 seconds)
proxy ! Publish(topic, message)
expectNoMsg(5 seconds)
}
test("Send message to address") {
val testActorAddress = testActor.path.toString
// val system2 = ActorSystem("test", MediatorTest.config)
// Cluster(system2).join(joinAddress)
mediator.!(Subscribe(topic, testActor))(testActor)
expectMsgType[SubscribeAck](5 seconds)
println(testActorAddress) // akka://test/system/testActor1
mediator.!(Publish(topic, message))(testActor)
expectMsg(2 seconds, message)
mediator ! Send(testActorAddress, message, false)
expectMsg(5 seconds, message)
}
}
謝謝羅蘭,我已經更新上面根據您的反饋測試。但收到的消息和SubscribeAck不是UnsubscribeAck。我還在我們的實施中增加了更多的信息,以反饋我們是否正確或不與代理演員合作。 – blu35ky
您是否嘗試過akka.actor.debug.unhandled或有死信? –
嗨,羅蘭,我有,但不幸的是沒有太多的記錄在DistributedPubSubMediator部分。今天我們發現DistributedPubSubMediator.Send也不適用於我們。我已經更新了測試。解決方法是傳遞ActorRef並直接發送給繞過介體的介質。我讀通過ActorRef不是一個好主意,但它似乎是我們現在唯一的選擇。 – blu35ky