2013-12-15 33 views
4

刪除鏈接我有阿卡演員K的鏈 - > L - >男,其中K的消息發送至L,L發送消息爲M,並且每個可使用回覆到它的前身接收到的消息的sender如何幹淨地/安全地從異步通信鏈

我的問題是:如何才能安全地從鏈斷開鏈接L,使得操作k之後將消息直接發送到M和M認爲K作爲其sender

如果每個演員都是穩定和長壽的,我可以研究如何告訴K和M彼此交談。 L可以掛起足夠長的時間來轉發已經在她的郵箱中的郵件,直到獲得K和M中的每一個不再與L對話的信號。

然而,K和M也可能考慮將自己解除鏈接,這就是一切都變得多毛。

是否有一個熟知的協議使安全工作?到目前爲止,我還沒有找到正確的搜索條件。

我知道一個選擇是凍結系統並將整個事物減去L拷貝到一個新鏈中。但是,我想要實現更實時,更少中斷的解決方案。

我一直在思考一些類型的鎖交換,其中K和M保證不取消鏈接自己,直到L具有完成斷開鏈接和轉發的所有消息。但是,即使演員不會完全鎖定(只是延遲他們自己的取消鏈接操作,直到方便的時間),任何帶有「鎖定」一詞的解決方案在異步解決方案中似乎都很尷尬。

+1

你想達到什麼目的?您想通過從異步通信鏈中刪除鏈接來解決這個大問題? –

+1

會,如果你有一個有狀態的代理代替L所有第一委託給你的電流I處理它的工作,但在某些時候,直接忘掉該委託,簡單地將消息傳遞K和M.你們之間仍然有3個演員,但你是有效的在某個時候將其中一個變成了無操作。 – erickson

+1

如果您想將L與M交換,您的演員K可以向L發送消息Z,之後可以停止發送消息並等待確認消息X.如果消息Z(如果您使用基於優先級的郵箱,不要忘記設置爲低於Z的常規優先級),則您的演員L可以向K發送確認消息X(並可以選擇自行停止)。接收到X K後,可以安全地將消息直接發送給M.您可以使用[FSM](http://doc.akka.io/docs/akka/snapshot/scala/fsm.html)來實現狀態邏輯。 –

回答

1

一個解決方案一個粗略的想法將是爲前面和後面的每個節點中保持狀態,然後支持消息從鏈unkinking一個節點,並告訴它的下一個節點發生變化的一個節點。當下一個節點發生變化時,發送一個毒藥丸給節點,告訴你這個節點(假設它是一個未鏈接的節點)來正常停止它。當鏈接斷開,前停了下來,作爲所有可能出現在更多的消息純直通把所有的一起,代碼會是這個樣子:

object ChainNode { 
    case object Unlink 
    case class Link(prev:Option[ActorRef], next:Option[ActorRef]) 
    case class ChangeNextNode(node:Option[ActorRef]) 
} 

trait ChainNode extends Actor{ 
    import ChainNode._ 
    import context._ 

    override def postStop{ 
    println(s"${self.path} has been stopped") 
    } 

    def receive = chainReceive() 

    def chainReceive(prevNode:Option[ActorRef] = None, nextNode:Option[ActorRef] = None):Receive = { 
    case Unlink => 
     prevNode foreach{ node => 
     println(s"unlinking node ${self.path} from sender ${node.path}") 
     node ! ChangeNextNode(nextNode) 
     } 
     become(unlinked(nextNode)) 

    case Link(prev, next) => 
     println(s"${self.path} is linking to $prev and $next") 
     become(chainReceive(prev, next)) 

    case ChangeNextNode(newNext) => 
     println(s"${self.path} is changing next node to $newNext") 
     become(chainReceive(prevNode, newNext)) 
     sender ! PoisonPill 

    case other =>  
     println(s"${self.path} received message $other") 
     val msg = processOther(other) 
     nextNode foreach{ node => 
     println(s"${self.path} forwarding on to ${node.path}") 
     node ! msg 
     } 
    } 

    def unlinked(nextNode:Option[ActorRef]):Receive = { 
    case any => 
     println(s"${self.path} has been unlinked, just forwarding w/o processing...") 
     nextNode foreach (_ ! any)  
    } 

    def processOther(msg:Any):Any 
} 

class NodeA extends ChainNode{ 
    def processOther(msg:Any) = "foo" 
} 

class NodeB extends ChainNode{ 
    def processOther(msg:Any) = "bar" 
} 

class NodeC extends ChainNode{ 
    def processOther(msg:Any) = "baz" 
} 

然後,一個簡單的測試場景,其中,該鏈接爲改變中途:

object ChainTest{ 
    import ChainNode._ 

    def main(args: Array[String]) { 
    val system = ActorSystem("chain") 
    val a = system.actorOf(Props[NodeA]) 
    val b = system.actorOf(Props[NodeA]) 
    val c = system.actorOf(Props[NodeA]) 

    a ! Link(None, Some(b)) 
    b ! Link(Some(a), Some(c)) 
    c ! Link(Some(b), None) 

    import system.dispatcher 
    Future{ 
     for(i <- 1 until 10){ 
     a ! "hello" 
     Thread.sleep(200) 
     } 
    } 

    Future{ 
     Thread.sleep(300) 
     b ! Unlink 
    } 
    } 
} 

這並不完美,但它可以作爲一個很好的起點。一個缺點是像UnlinkChangeNextNode這樣的消息仍然會按照他們收到的順序進行處理。如果郵箱中的郵件前面有一堆郵件,那麼必須在更改之前先處理這些郵件(例如取消鏈接),然後才能生效。這可能會導致更改發生不必要的延遲。如果這是一個問題,那麼你可能想尋找到一個基於優先級的郵箱在那裏同樣UnlinkChangeNextNode消息具有更高的優先級則消息的其餘部分進行處理。

+0

你提到這個問題時,你幾乎描述了我得到的算法。我遇到的問題是,當在數百到數千條消息的負載下進行測試時,非活動/死亡節點有時在獲得PoisonPill後會收到一兩條消息。 (我讓它們變成不活動而不是死的,所以如果他們得到一條消息,我可能會失敗)。我懷疑當兩個連續的節點都試圖同時解除鏈接並將衝突的消息發送到它們的上游時和下游節點。也許我已經過度設計了它。 –

+0

謝謝,順便說一句,寫代碼。我從閱讀中學到了一些東西。例如,我沒有注意到postStop,並且你成爲相同狀態但具有不同參數的技術很好地證明了我剛纔在別處學到的東西。謝謝! –