我已經寫了一個scala應用程序來做這件事,儘管它是圍繞壓力測試我正在開發的API。使用參與者執行併發請求非常容易。一般的想法是你想創建一個當給出消息時執行單個請求的actor。然後,您創建該actor的多個實例,並使用負載均衡器向其分發請求。
這裏是一個非常簡單的實現,給你一個想法(使用Akka演員):
case class MakeRequest(url: String)
class RequestActor extends Actor {
//force the actor to run in it's own thread
self.dispatcher = akka.dispatch.Dispatchers.newThreadBasedDispatcher(self)
def receive = {
case MakeRequest(url) => //perform the request
}
}
val totalRequests = 1000
val url = "http://..."
val totalConections = 4
//create one actor per connection and wrap them in a load balancer
val actors = (0 to totalConnections).map{i => actorOf[RequestActor].start}
val requestBalancer = loadBalancerActor(new CyclicIterator(actors))
//start sending requests
for (i <- 0 to totalRequests) {
requestBalancer ! MakeRequest(url)
}
//send a poison pill to stop the actors after they've finished all the requests
requestBalancer ! Broadcast(PoisonPill)
//wait for the actors to finish
while (actors.foldLeft(false){ (b, a) => b || !a.isShutdown}) {
Thread.sleep(300)
}
//stop the load balancer
requestBalancer ! PoisonPill
正如你所看到的,我送MakeRequest
消息負載平衡器,橫跨演員分發它們。每個演員都有一個消息隊列,因此一個演員會一次發出一個請求,但是演員一起會同時發出請求。
這個例子沒有提供一種計算響應的方法,並且你冒着溢出演員隊列的風險,但這些很容易解決。我用這個大概的想法來進行廣泛的壓力測試和基準測試,取得了巨大的成功。
你聽錯了:在Scala中寫* *併發應用程序要容易得多。主要原因是您不寫_multi-threaded_應用程序。相反,您使用其他併發抽象,這些併發抽象恰好通過線程實現,但您看不到線程。這在這裏很重要,因爲你想保證N個併發線程,這是從你的控制中取走的第一件事情之一。 –