0
我需要在給定的毫秒內儘可能多次執行相同的任務。所以我使用scala.actors.threadpool.Executors作爲以下代碼。是否有任何Executor連接到經紀人?
import actors.threadpool.{TimeUnit, Future, Executors}
import collection.mutable.ListBuffer
object TaskRepeater {
def repeat(task: Runnable, millisecs: Long): Int = {
val exector = Executors.newCachedThreadPool
val remover = Executors.newCachedThreadPool
val queue = ListBuffer[Future]()
def removeDone() {
(queue filter (_.isDone)) foreach { f =>
queue.remove(queue indexOf f)
}
}
val lock = new EasyLock()
val maxQueueSize = scala.collection.parallel.availableProcessors
val queueAvaiable = lock.mkCondition(queue.size < maxQueueSize)
var cnt = 0
val start = System.nanoTime()
while(System.nanoTime() - start < millisecs * 1000000) {
lock {
queueAvaiable.waitUntilFulfiled()
cnt += 1
val r = exector.submit(task)
queue += r
remover.submit(runneble {
r.get()
lock {
removeDone()
queueAvaiable.signalIfFulfilled()
}
})
}
}
exector.shutdown()
remover.shutdown()
assert(exector.awaitTermination(1, TimeUnit.SECONDS))
cnt
}
def runneble(f: => Unit) = new Runnable {
def run() {
f
}
}
}
import actors.threadpool.locks.{Condition, ReentrantLock}
class EasyLock {
val lock = new ReentrantLock
def mkCondition(f: => Boolean): EasyCondition = {
new EasyCondition(lock.newCondition(), f)
}
def apply(f: => Unit) {
lock.lock()
try {
f
} finally {
lock.unlock()
}
}
}
class EasyCondition(c: Condition, condition: => Boolean) {
def waitUntilFulfiled(f: => Unit) {
while (!condition) {
c.await()
}
f
}
def signalAllIfFulfilled() {
if (condition) c.signalAll()
}
def signalIfFulfilled() {
if (condition) c.signal()
}
}
但是這有點複雜。相反,我認爲,Twitter的經紀商和一些與經紀商聯繫並將完成的任務發送給經紀商的執行商(如果存在)可以使其更容易。
下面是一個僞代碼
val b = new Broker
val e = new Executor // connects with the broker anyway
e.runTask
val o = b.recv
o.sync() // wait until the task finishes
是否有任何這樣的執行人?
謝謝阿列克謝,這就是我想要的。原諒我不喜歡,因爲我的名聲太低:) – yakamoto
@yakamoto你仍然可以接受答案,我相信(點擊選票下的選中標記)。 –
再次感謝:P – yakamoto