2017-06-22 62 views
0

我有100個線程,每次只需要處理12個線程,但不會超過這個線程。這些線程完成後,其他12個線程必須被處理,但是它只處理前12個線程,然後在那之後終止。
這裏是我的邏輯:Akka線程調整

class AkkaProcessing extends Actor { 
    def receive = { 
case message: List[Any] => 
var meterName = message(0) // It Contains only 12 threads , it process them and terminates. Am unable to get remaining threads 

val sqlContext = message(1).asInstanceOf[SQLContext] 
val FlagDF = message(2).asInstanceOf[DataFrame] 
     { 

       All the business logic here 
      } 

     context.system.shutdown() 
    } 
    } 
} 
object Processing { 
    def main(args: Array[String]) = { 
    val rawBuff = new ArrayBuffer[Any]() 
    val actorSystem = ActorSystem("ActorSystem") // Creating ActorSystem 
    val actor = actorSystem.actorOf(Props[AkkaProcessing].withRouter(RoundRobinPool(200)), "my-Actor") 
    implicit val executionContext = actorSystem.dispatchers.lookup("akka.actor.my-dispatcher") 

    for (i <- 0 until meter_list.length) { 

    var meterName = meter_list(i)  // All 100 Meters here 

    rawBuff.append(meterName, sqlContext, FlagDF) 
    actor ! rawBuff.toList 
    } 
    } 
    } 

任何輸入的高度讚賞

回答

0

我想你可能是最好的創建2種演員類型:消費類(在並行運行)和協調(這需要12個線程任務並將它們傳遞給消費者)。協調員將等待消費者完成,然後運行下一批。

看到這個答案的代碼示例:Can Scala actors process multiple messages simultaneously?

如果做不到這一點,你可以只使用期貨以類似的方式。