2013-10-21 12 views
0

我的同事和我一直困惑於DistributedPubSubMediator直接或通過代理Actor訂閱/取消訂閱的不同行爲。我們彙總了一個測試以顯示下面的不同結果。DistributedPubSubMediator通過代理訂閱Actor不起作用

根據我們的理解,ActorRef.forward應該在原始發件人中傳遞,因此無論該郵件是直接發送給中介還是通過代理Actor都無關緊要。 IE瀏覽器。 http://www.scala-lang.org/api/current/index.html#scala.actors.ActorRef

要變通,我們必須擴展DIstributedPubSubMediator類幷包含DistributedPubSubMediator對象已提供的邏輯。理想情況下,我們希望直接使用對象並還原代碼。

這看起來像一個錯誤。有誰知道這種不尋常行爲的根本原因?請幫助...

[2013年10月22日]根據Roland的答案更新測試(謝謝),並在SubscriberAck和UnsubscribeAck上添加expectMsgType。我們現在收到SubscribeAck,但奇怪的不是UnSubscribeAck。這不是一個重大問題,但我們想知道爲什麼。

另一個問題,如果我們可能會問一個問題,是通過在同一個ActorSystem中運行的代理Actor將遠程參與者訂閱到DistributedPubSubMediator是否是一種好的做法?

目前,我們有:

  1. 訂閱應用程序發現發佈應用程序(在非阿卡路),並獲得簇地址。
  2. 遠程用戶使用此地址和已知代理actor的路徑發送身份請求。
  3. 遠程用戶獲取ActorIdentity響應,然後通過此(遠程)代理訂閱/取消訂閱。
  4. 在發佈者應用程序訂閱/取消訂閱消息被轉發到DistributedPubSubMediator,它用於發佈後續業務消息。

根據Akka Reactor pubsub聊天客戶端示例(即僅使用DistributedPubSubMediator發佈),我們沒有加入羣集,因爲我們需要處理髮布者端的故障轉移。

[2013年11月5日]增加了對發送消息的測試。它似乎沒有工作,我們還沒有弄清楚。

package star.common.pubsub 

import org.scalatest.{BeforeAndAfterAll, FunSuite} 

import org.junit.runner.RunWith 

import akka.contrib.pattern.DistributedPubSubExtension 
import akka.contrib.pattern.DistributedPubSubMediator._ 
import akka.testkit.TestKit 
import akka.actor.{Actor, ActorSystem, ActorRef, Props} 
import scala.concurrent.duration._ 

import com.typesafe.config.ConfigFactory 

object MediatorTest { 
    val config = ConfigFactory.parseString(s""" 
           akka.actor.provider="akka.cluster.ClusterActorRefProvider" 
           akka.remote.netty.tcp.port=0 
           akka.extensions = ["akka.contrib.pattern.DistributedPubSubExtension"] 
           """) 
} 

@RunWith(classOf[org.scalatest.junit.JUnitRunner]) 
class MediatorTest extends TestKit(ActorSystem("test", MediatorTest.config)) with FunSuite { 

    val mediator = DistributedPubSubExtension(system).mediator 
    val topic = "example" 
    val message = "Published Message" 
    // val joinAddress = Cluster(system).selfAddress 
    // Cluster(system).join(joinAddress) 

    test("Direct subscribe to mediator") { 
    mediator.!(Subscribe(topic, testActor))(testActor) 
    expectMsgType[SubscribeAck](5 seconds) 

    mediator.!(Publish(topic, message))(testActor) 
    expectMsg(2 seconds, message) 

    mediator.!(Unsubscribe(topic, testActor))(testActor) 
    expectMsgType[UnsubscribeAck](5 seconds) 

    mediator ! Publish(topic, message) 
    expectNoMsg(2 seconds) 
    } 


    test("Subscribe to mediator via proxy") { 
    class Proxy extends Actor { 
     override def receive = { 
     case subscribe: Subscribe => 
      mediator forward subscribe 

     case unsubscribe: Unsubscribe => 
      mediator forward unsubscribe 

     case publish: Publish => 
      mediator.!(publish) 
     } 
    } 

    val proxy = system.actorOf(Props(new Proxy), "proxy") 

    proxy.!(Subscribe(topic,testActor))(testActor) 
    expectMsgType[SubscribeAck](2 seconds) 

    proxy ! Publish(topic, message) 
    expectMsg(5 seconds, message) 

    proxy.!(Unsubscribe(topic,testActor))(testActor) 
    expectMsgType[UnsubscribeAck](5 seconds) 

    proxy ! Publish(topic, message) 
    expectNoMsg(5 seconds) 
    } 

    test("Send message to address") { 

    val testActorAddress = testActor.path.toString 
    // val system2 = ActorSystem("test", MediatorTest.config) 
    // Cluster(system2).join(joinAddress) 

    mediator.!(Subscribe(topic, testActor))(testActor) 
    expectMsgType[SubscribeAck](5 seconds) 

    println(testActorAddress) // akka://test/system/testActor1 

    mediator.!(Publish(topic, message))(testActor) 
    expectMsg(2 seconds, message) 

    mediator ! Send(testActorAddress, message, false) 

    expectMsg(5 seconds, message) 
    } 
} 

回答

1

兩件事情:

  • 無論是否使用forward沒有多大意義,因爲你沒有在測試過程中範圍有用的發送者(你是不是在ImplicitSender混合);但這不是問題
  • 你不轉發Publish消息,這就是爲什麼它不發佈消息
+0

謝謝羅蘭,我已經更新上面根據您的反饋測試。但收到的消息和SubscribeAck不是UnsubscribeAck。我還在我們的實施中增加了更多的信息,以反饋我們是否正確或不與代理演員合作。 – blu35ky

+0

您是否嘗試過akka.actor.debug.unhandled或有死信? –

+0

嗨,羅蘭,我有,但不幸的是沒有太多的記錄在DistributedPubSubMediator部分。今天我們發現DistributedPubSubMediator.Send也不適用於我們。我已經更新了測試。解決方法是傳遞ActorRef並直接發送給繞過介體的介質。我讀通過ActorRef不是一個好主意,但它似乎是我們現在唯一的選擇。 – blu35ky