2013-10-19 68 views
1

我有奇怪的行爲與阿卡TCP IO,問題是連接復位,由於從處理器或處理器TerminatingTcpMessage.abort()顯式調用。 Peer沒有收到Tcp.ConnectionClosed事件。例如:阿卡連接中止

處理程序的onReceive

測試碼登錄後
@Override 
public void onReceive(Object msg) throws Exception { 
    if (msg instanceof Tcp.ConnectionClosed) { 
     log.info("Server ConnectionClosed: {}", msg); 
     getContext().stop(getSelf()); 
    } else if (msg instanceof Tcp.Received){ 
     log.info("Aborting connection"); 
     getSender().tell(TcpMessage.abort(), getSelf()); 
    } 
} 

new JavaTestKit(system) {{ 
    getSystem().actorOf(Props.create(Server.class, getRef()), "Server"); 
    Tcp.Bound bound = expectMsgClass(Tcp.Bound.class); 

    ActorRef client = getSystem().actorOf(
    Props.create(Client.class, bound.localAddress(), getRef()), "Client"); 
    watch(client); 
    expectMsgClass(Tcp.Connected.class); 
    client.tell(new Message.Write(), getRef()); 

    expectTerminated(client); 
}}; 

執行

[INFO] [10/19/2013 22:13:22.730] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client Connected 
[INFO] [10/19/2013 22:13:22.736] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Sending Write to Handler 
[INFO] [10/19/2013 22:13:22.767] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Aborting connection 
[INFO] [10/19/2013 22:13:22.774] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted 

java.lang.AssertionError: assertion failed: timeout (3 seconds) during expectMsg: Terminated Actor[akka://actorSystem/user/Client#423174850] 

如果我在處理程序改變TcpMessage.abort()TcpMessage.close()

[INFO] [10/19/2013 22:17:06.243] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected 
[INFO] [10/19/2013 22:17:06.249] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler 
[INFO] [10/19/2013 22:17:06.278] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection 
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client] Client ConnectionClosed: PeerClosed 
[INFO] [10/19/2013 22:17:06.288] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Closed 

如何服務器處理程序可以通知遠程客戶端錯誤關閉什麼聯繫呢?一切工作正常,如果我發送中止從客戶端到服務器。

編輯:

好吧,發現問題根源,中止命令,如果我給寫命令之前,我試圖中止不僅工作。在客戶端和服務器上。現在我需要弄清楚爲什麼。

EDIT2:

好像中止消息被卡住,如果創建新的客戶端然後ErrorClosed達到第一客戶端,併發送相同的寫命令到第二客戶端執行該接收由客戶端按預期中止。

new JavaTestKit(system) {{ 
    getSystem().actorOf(Props.create(Server.class, getRef()), "Server"); 
    Tcp.Bound bound = expectMsgClass(Tcp.Bound.class); 

    ActorRef client = getSystem().actorOf(
      Props.create(Client.class, bound.localAddress(), getRef()), "Client"); 
    watch(client); 
    expectMsgClass(Tcp.Connected.class); 
    client.tell(new Message.Write(), getRef()); 
    expectNoMsg(); 

    ActorRef client2 = getSystem().actorOf(
      Props.create(Client.class, bound.localAddress(), getRef()), "Client2"); 
    watch(client2); 
    expectMsgClass(Tcp.Connected.class); 

    expectTerminated(client); 

    client2.tell(new Message.Write(), getRef()); 

    expectTerminated(client2); 
    expectNoMsg(); 
}}; 

和日誌:

[INFO] [10/20/2013 00:48:05.923] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Client Connected 
[INFO] [10/20/2013 00:48:05.929] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client] Sending Write to Handler 
[INFO] [10/20/2013 00:48:05.959] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$a] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/2#1767183113]] 
[INFO] [10/20/2013 00:48:05.966] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Server/$a] Server ConnectionClosed: Aborted 
[INFO] [10/20/2013 00:48:08.930] [actorSystem-akka.actor.default-dispatcher-6] [akka://actorSystem/user/Client2] Client Connected 
[INFO] [10/20/2013 00:48:08.934] [actorSystem-akka.actor.default-dispatcher-3] [akka://actorSystem/user/Client] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host) 
[INFO] [10/20/2013 00:48:08.936] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Client2] Sending Write to Handler 
[INFO] [10/20/2013 00:48:08.937] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Aborting connection, connection actor: [Actor[akka://actorSystem/system/IO-TCP/selectors/$a/4#-598138943]] 
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-4] [akka://actorSystem/user/Server/$b] Server ConnectionClosed: Aborted 
[INFO] [10/20/2013 00:48:08.938] [actorSystem-akka.actor.default-dispatcher-7] [akka://actorSystem/user/Client2] Client ConnectionClosed: ErrorClosed(An existing connection was forcibly closed by the remote host) 

EDIT3:

爲了使它一點點清晰,問題是:我怎麼能告訴什麼對等體墜毀我的遠程演員處理器和ErrorClosed連接(它在遠端執行)。最後ErrorClosed消息總是被卡住,它並沒有消失,它只是坐在某處選擇,只推同行我做的另一個網絡操作(寫或其他演員連接到服務器)之後。只有在中止之前執行Write命令時纔會遇到此行爲。此行爲與操作系統無關;我在Windows和Linux機器上嘗試過,結果相同。

+0

那麼真正的問題是什麼? – EJP

+0

問題是,如果我的遠程演員墜毀,我怎麼能指示同行?我發送序列寫入命令後出錯了。 – YoK

+0

這是一個錯誤,票證是https://www.assembla.com/spaces/ddEDvgVAKr3QrUeJe5aVNr/tickets/3686。抱歉耽擱了。 –

回答

1

我沒有發現這個問題固溶體,但我沒幾件事情,以儘量減少總的影響。

  1. 不明確使用RST(這包括中止命令)。始終關閉連接FIN(關閉)。即使在錯誤的情況下。
  2. 信號連接緊密,如果演員死了,因爲錯誤的,所以連接可以正常關閉。
  3. 使用異步REP-REQ爲transfering大量數據(ZMQ)數據傳輸機制。

另外心跳的演員可以實現,但它似乎不是很好的主意,特別是Akka IO選擇器自己在TCP套接字上運行心跳?