2014-10-08 101 views
0

我試圖在演員死亡時發送消息。使用DeathWatch殺死Akka actor時發送的消息

這是基於阿卡臨終看護文檔: http://doc.akka.io/docs/akka/2.3.6/java/untyped-actors.html#deathwatch-java

在serviceActor我等待「殺」的消息,但我從來沒有真正發送此消息。所以要在ServiceActor收到消息我用:

else if (msg instanceof Terminated) { 
     final Terminated t = (Terminated) msg; 
     if (t.getActor() == child) { 
      lastSender.tell(Msg.TERMINATED, getSelf()); 
     } 
    } else { 
     unhandled(msg); 
    } 

我已經設置持續時間10毫秒:

Duration.create(10, TimeUnit.MILLISECONDS) 

但該消息Msg.TERMINATED永遠不會在的onReceive方法獲得:

@Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 

當ServiceActor失敗時,如何向HelloWorld發送消息?

整個代碼:

package terminatetest; 
import akka.Main; 

public class Launcher { 

    public static void main(String args[]) { 

     String[] akkaArgsArray = new String[1]; 

     akkaArgsArray[0] = "terminatetest.HelloWorld"; 

     Main.main(akkaArgsArray); 

    } 

} 

package terminatetest; 


import java.util.concurrent.TimeUnit; 

import scala.concurrent.duration.Duration; 
import akka.actor.ActorRef; 
import akka.actor.PoisonPill; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 

public class HelloWorld extends UntypedActor { 

    @Override 
    public void preStart() { 

     int counter = 0; 

     akka.actor.ActorSystem system = getContext().system(); 

     final ActorRef greeter = getContext().actorOf(
       Props.create(ServiceActor.class), String.valueOf(counter)); 

     system.scheduler().scheduleOnce(
       Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
        public void run() { 
         greeter.tell(PoisonPill.getInstance(), getSelf()); 
        } 
       }, system.dispatcher()); 

     greeter.tell("http://www.google.com", getSelf()); 

     counter = counter + 1; 
    } 

    @Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 
} 

package terminatetest; 

import static com.utils.PrintUtils.println; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.MalformedURLException; 
import java.net.URL; 

import akka.actor.ActorRef; 
import akka.actor.Props; 
import akka.actor.Terminated; 
import akka.actor.UntypedActor; 

public class ServiceActor extends UntypedActor { 

    final ActorRef child = this.getContext().actorOf(Props.empty(), "child"); 
    { 
     this.getContext().watch(child); 
    } 

    ActorRef lastSender = getContext().system().deadLetters(); 

    public static enum Msg { 
     SUCCESS, FAIL, TERMINATED; 
    } 

    @Override 
    public void onReceive(Object msg) { 

     if (msg instanceof String) { 
      String urlName = (String) msg; 

      try { 
       long startTime = System.currentTimeMillis(); 
       URL url = new URL(urlName); 
       HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
       conn.connect(); 

       BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); 
       StringBuilder out = new StringBuilder(); 
       String line; 
       while ((line = reader.readLine()) != null) { 
        out.append(line); 
       } 
       System.out.println("Connection successful to " + url); 
       System.out.println("Content is " + out); 
       long endTime = System.currentTimeMillis(); 
       System.out.println("Total Time : " + (endTime - startTime) + " milliseconds"); 

      } catch (MalformedURLException mue) { 
       println("URL Name " + urlName); 
       System.out.println("MalformedURLException"); 
       System.out.println(mue.getMessage()); 
       mue.printStackTrace(); 
       getSender().tell(Msg.FAIL, getSelf()); 
      } catch (IOException ioe) { 
       println("URL Name " + urlName); 
       System.out.println("IOException"); 
       System.out.println(ioe.getMessage()); 
       ioe.printStackTrace(); 
       System.out.println("Now exiting"); 
       getSender().tell(Msg.FAIL, getSelf()); 
      } 
     } 

     else if (msg instanceof Terminated) { 
       final Terminated t = (Terminated) msg; 
       if (t.getActor() == child) { 
        lastSender.tell(Msg.TERMINATED, getSelf()); 
       } 
      } else { 
       unhandled(msg); 
      } 
    } 

} 

更新:

更新到ServiceActor:

if (urlName.equalsIgnoreCase("poisonPill")) { 
    this.getSelf().tell(PoisonPill.getInstance(), getSelf()); 
    getSender().tell(Msg.TERMINATED, getSelf()); 
} 

更新到的HelloWorld 我現在從兒童演員本身使用啓動poisonPill:

system.scheduler().scheduleOnce(
     Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
      public void run() { 
       greeter.tell("poisonPill", getSelf()); 
      } 
     }, system.dispatcher()); 

這顯示下面的輸出:經過10毫秒被髮送

startTime : 1412777375414 
Connection successful to http://www.google.com 
Content is ....... (I'veremoved the content for brevity) 
Total Time : 1268 milliseconds 
Terminated 

的poisonPill消息和用於該示例中,演員住爲1268毫秒。那麼爲什麼演員在poisonPill發送時沒有終止?這是因爲時間很短?

更新代碼:

package terminatetest; 


import java.util.concurrent.TimeUnit; 

import scala.concurrent.duration.Duration; 
import akka.actor.ActorRef; 
import akka.actor.Props; 
import akka.actor.UntypedActor; 

public class HelloWorld extends UntypedActor { 

    @Override 
    public void preStart() { 

     int counter = 0; 

     akka.actor.ActorSystem system = getContext().system(); 

     final ActorRef greeter = getContext().actorOf(
       Props.create(ServiceActor.class), String.valueOf(counter)); 

     system.scheduler().scheduleOnce(
       Duration.create(10, TimeUnit.MILLISECONDS), new Runnable() { 
        public void run() { 
         greeter.tell("poisonPill", getSelf()); 
        } 
       }, system.dispatcher()); 

     greeter.tell("http://www.google.com", getSelf()); 

     counter = counter + 1; 
    } 

    @Override 
    public void onReceive(Object msg) { 
     if (msg == ServiceActor.Msg.SUCCESS) { 
      System.out.println("Success"); 
      getContext().stop(getSelf()); 
     } else if (msg == ServiceActor.Msg.TERMINATED) { 
      System.out.println("Terminated"); 
     } else 
      unhandled(msg); 
    } 
} 


package terminatetest; 

import static com.utils.PrintUtils.println; 

import java.io.BufferedReader; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.net.HttpURLConnection; 
import java.net.MalformedURLException; 
import java.net.URL; 

import akka.actor.ActorRef; 
import akka.actor.PoisonPill; 
import akka.actor.UntypedActor; 

public class ServiceActor extends UntypedActor { 

    ActorRef lastSender = getSender(); 

    public static enum Msg { 
     SUCCESS, FAIL, TERMINATED; 
    } 

    @Override 
    public void onReceive(Object msg) { 

     if (msg instanceof String) { 
      String urlName = (String) msg; 

      if (urlName.equalsIgnoreCase("poisonPill")) { 
       this.getSelf().tell(PoisonPill.getInstance(), getSelf()); 
       getSender().tell(Msg.TERMINATED, getSelf()); 
      } 

      else { 

       try { 
        long startTime = System.currentTimeMillis(); 
        System.out.println("startTime : "+startTime); 
        URL url = new URL(urlName); 
        HttpURLConnection conn = (HttpURLConnection) url.openConnection(); 
        conn.connect(); 

        BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream())); 
        StringBuilder out = new StringBuilder(); 
        String line; 
        while ((line = reader.readLine()) != null) { 
         out.append(line); 
        } 
        System.out.println("Connection successful to " + url); 
        System.out.println("Content is " + out); 
        long endTime = System.currentTimeMillis(); 
        System.out.println("Total Time : " + (endTime - startTime) + " milliseconds"); 

       } catch (MalformedURLException mue) { 
        println("URL Name " + urlName); 
        System.out.println("MalformedURLException"); 
        System.out.println(mue.getMessage()); 
        mue.printStackTrace(); 
        getSender().tell(Msg.FAIL, getSelf()); 
       } catch (IOException ioe) { 
        println("URL Name " + urlName); 
        System.out.println("IOException"); 
        System.out.println(ioe.getMessage()); 
        ioe.printStackTrace(); 
        System.out.println("Now exiting"); 
        getSender().tell(Msg.FAIL, getSelf()); 
       } 
      } 
     } 
    } 

} 

回答

1

我覺得從這個事實,你只設置lastSender一次,施工ServiceActor的過程中,你明確地將其設置爲死信您的問題造成的。如果你想發送郵件給發給你String消息的演員,那麼你需要將lastSender設置爲sender()。如果不這樣做,您的Msg.TERMINATED將永遠失效。

編輯

現在我在這裏看到了真正的問題。在HelloWorld演員中,您將發送PoisonPillServiceActorServiceActor將因此而自行停止,因此也會停止child參考(因爲它是ServiceActor的子演員)。此時,您會認爲Terminated消息將被傳送到ServiceActor,因爲它明確地注意到child(並且它可能會傳送),但是您已經發送了PoisonPillServiceActor,因此它不會處理在該消息之後收到的任何消息(這將是Terminate),所以這就是爲什麼塊:

else if (msg instanceof Terminated) { 

永遠不會在ServiceActor擊中。

EDIT2

你的演員接收先打穀歌的請求和第二接收"poisonPill"消息(10毫秒之後)。作爲一名演員按順序處理它的郵箱,演員會完全處理請求,以便在處理消息停止自己之前觸擊Google。這就是爲什麼演員在10毫秒後不會停下來的原因。你不能阻止演員在做什麼。

+0

謝謝,我已經更新了poisonPill的發送方式 - 它現在作爲一個String消息發送,當actor收到「poisonPill」時它自行終止:this.getSelf()。tell(PoisonPill.getInstance(),getSelf )); 。但是,這似乎也不工作,我已更新問題,包括這一點,你可以看看嗎? – 2014-10-08 14:17:54

+0

@ blue-sky,增加了更多的細節...... – cmbaxter 2014-10-08 14:26:02

+0

所以通過這個推理「你不能阻止一個演員在它正在做什麼。」一個演員不能在N秒後停止? – 2014-10-08 15:00:51

相關問題