2015-01-12 246 views
1

我對Scala的Future和Akka非常陌生,目前,我正試圖執行一個應用程序,它執行一系列獨立任務並將結果收集在一起。並行執行期貨清單

例如,我想要一個由多個任務組成的應用程序,每個任務接收一個數字,休眠幾秒鐘,然後返回「Hello」消息。

演員被實現如下:

class HelloActor extends Actor { 
    def receive = { 
    case name:Int => { 
     println("%s will sleep for %s seconds".format(name, name % 4)) 
     Thread.sleep(name % 4 * 1000) 
     sender ! "Hello %s".format(name) 
    } 
    } 
} 

主要目的是這樣實現的:

object HelloAkka extends App { 
    val system = ActorSystem("HelloSystem") 

    val helloActor = system.actorOf(Props[HelloActor], name = "helloactor") 

    implicit val timeout = Timeout(20, TimeUnit.SECONDS) 

    val futures = (1 to 10).map(num => { 
    helloActor ? num 
    }) 

    val future = Future.sequence(futures) 

    val results = Await.result(future, timeout.duration) 

    println(results) 

    system.shutdown 
} 

由於每個任務會睡眠0,1,2或3秒,我期望睡眠時間較短的任務要先執行。但是,結果是:

1 will sleep for 1 seconds 
2 will sleep for 2 seconds 
3 will sleep for 3 seconds 
4 will sleep for 0 seconds 
5 will sleep for 1 seconds 
6 will sleep for 2 seconds 
7 will sleep for 3 seconds 
8 will sleep for 0 seconds 
9 will sleep for 1 seconds 
10 will sleep for 2 seconds 
Vector(Hello 1, Hello 2, Hello 3, Hello 4, Hello 5, Hello 6, Hello 7, Hello 8, Hello 9, Hello 10) 

換句話說,所有任務都按順序執行。我想知道是否有任何方法讓我平行執行所有任務。

+5

您正在將您的所有請求發送給同一個演員。來自/到同一對演員的消息被保證按順序執行。您可以將它們發送給演員的10個不同副本,也可以在演員的睡眠周圍移動Future操作員。 –

+0

@DiegoMartinoia非常感謝,我使用多個演員時很有用。你可以發表你的評論作爲答案,以便我可以標記它嗎? –

+0

完成,感謝您的耐心等待! –

回答

3

正如在評論中提到的,您將所有任務/消息發送給一個參與者,並且確保所有這些任務/消息都將按順序處理。

要並行處理任務,您需要有多個處理器參與者實例,在這種情況下,您的情況爲HelloActor

當然,您可以創建HelloActor的多個實例,但這絕對不是最佳實踐。

對於這種類型的任務,您應該使用內置路由功能,它允許您管理員工/處理程序池並通過一個router角色與其交互,例如。

val router: ActorRef = 
    context.actorOf(RoundRobinPool(10).props(Props[HelloActor]), "router") 

... 
router ? num 
... 

請按照Akka Routing文件,以獲得更多的細節。

0

而不是按照評論和答案中的建議啓動多個參與者,我會建議執行Future中的實際任務。所以你的演員會更像是任務的協調員。例如: -

//...  

// import pipe pattern to get access to `pipeTo` method 
import akka.pattern.pipe 
import scala.concurrent.Future 

// the `Future`s will be executed on this dispatcher 
// depending on your needs, you may want to create a 
// dedicated executor for this 
class TaskCoordinatorActor extends Actor { 
    import context.dispatcher 

    def receive = { 
    case name: Int => 
     Future { 
     Thread.sleep(name % 4 * 1000) 
     "Hello %s".format(name) 
     } pipeTo sender() 
    } 
} 

上面的代碼在scala.concurrent.Future和管道執行你的任務結果到原始發件人。這樣,演員在任務完成之前不會阻止,但一旦創建了Future,則準備好接收下一條消息。

P.S .:不是發送普通整數,而是創建消息類型,這些消息類型可以明確你想讓演員做什麼。在你的情況下,它可能是例如:

case class Sleep(duration: Duration) 
0

從同一個演員發送給同一個演員的消息將按順序執行。

你有兩個選擇。

要麼創建HelloActor的新副本的每個消息,讓他們都並行執行,或修改您的HelloActor要的東西如下(可能是錯誤的進口,通過記憶去):

import akka.pattern.pipe._ 

class HelloActor extends Actor { 
    def receive = { 
    case name:Int => { 
     println("%s will sleep for %s seconds".format(name, name % 4)) 
     Future(sleepAndRespond(name)) pipeTo sender 
    } 
} 

def sleepAndRespond(name:String) = { 
    Thread.sleep(name % 4 * 1000) 
    "Hello %s".format(innerName) 
} 
} 

這樣,執行的順序部分就是未來的管道,然後對十條消息中的每條消息進行異步執行。

+0

@drexin關於使用路由器的建議是非常好的,如果你採用多種答案的方法。另外請注意,在這兩種情況下,您的性​​能都與您的線程池/調度程序的配置密切相關。查看文檔以獲取更多詳細信息! –