2017-03-28 77 views
0

我從數據庫表或平面文件中取出4000萬行(數據行)。我正在處理groovy 評估的每行,每行創建一個工人(所以在這種情況下,我創建了4000萬工人)。 在這裏,我正在使用AKKA循環賽池。這種方法是否正確?如果不是,最好的辦法是什麼。Akka循環池

public class AkkaWay { 

public static void main(String[] args) { 
    System.out.println("************************** start *****************************"); 
    new AkkaWay().run(); 
    System.out.println("************************** END *****************************"); 
} 

private void run() { 
    ActorSystem system = ActorSystem.create("CalcSystem"); 
    ActorRef master = system.actorOf(Master.createMaster(), "master"); 
    master.tell(new Calculate(), ActorRef.noSender()); 
    while(!master.isTerminated()){ 
    try{ 
    //System.out.println("*********************************** Thread *************************************************"); 
    Thread.sleep(100); 
    }catch(Exception e){ 
    e.printStackTrace(); 
    } 
    } 
} 
} 

public class Master extends UntypedActor 
{ 
private final Time time = new Time(); 

public Master() { 
    workerRouter = this.getContext().actorOf(Worker.createWorker().withRouter(new RoundRobinRouter(4)),"workerRouter"); 
} 

@Override 
public void onReceive(Object message) { 
    if (message instanceof Calculate) { 
     time.start(); 
     processMessages(); 
    } else if (message instanceof Result) { 
     list.add(((Result) message).getFactorial()); 
     if (list.size() == messages) 
      end(); 
    } else { 
     unhandled(message); 
    } 
} 

private void processMessages() 
{ 
    //read data from file/database (40 millions rows) 
    for (int i = 0; i < rows; i++) { 

     workerRouter.tell(new Work(), getSelf());// each row send 
    } 
} 

private void end() { 
    time.end(); 
    System.out.println("Done: " + time.elapsedTimeMilliseconds()+"["+time.elapsedTimeMilliseconds()/1000+" secs]"); 
    getContext().system().shutdown(); 
} 

public static Props createMaster() { 
    return Props.create(Master.class, new ArraySeq<Object>(0)); 
} 
} 

public class Worker extends UntypedActor 
{ 

@Override 
public void onReceive(Object message) { 
    if (message instanceof Work) { 
     //evaluate Groovy expression 
     getSender().tell(new Result(bigInt), getSelf()); 
    } else 
     unhandled(message); 
} 

public static Props createWorker() { 
    return Props.create(Worker.class, new ArraySeq<Object>(0)); 
} 
} 
+1

codereview似乎更適合這個問題,不是嗎? – Quickbeam2k1

回答

1

我覺得這是不是最好的方法,因爲在最壞的情況下可能會導致你在內存中加載40個MIO行,在演員的郵箱等。

使用akka-stream可以更好地解決這類問題,其中只需要一次加載所需的數據。