2012-10-30 71 views
0

我需要在給定的毫秒內儘可能多次執行相同的任務。所以我使用scala.actors.threadpool.Executors作爲以下代碼。是否有任何Executor連接到經紀人?

import actors.threadpool.{TimeUnit, Future, Executors} 
import collection.mutable.ListBuffer 

object TaskRepeater { 

    def repeat(task: Runnable, millisecs: Long): Int = { 
    val exector = Executors.newCachedThreadPool 
    val remover = Executors.newCachedThreadPool 

    val queue = ListBuffer[Future]() 
    def removeDone() { 
     (queue filter (_.isDone)) foreach { f => 
     queue.remove(queue indexOf f) 
     } 
    } 

    val lock = new EasyLock() 

    val maxQueueSize = scala.collection.parallel.availableProcessors 

    val queueAvaiable = lock.mkCondition(queue.size < maxQueueSize) 

    var cnt = 0 

    val start = System.nanoTime() 

    while(System.nanoTime() - start < millisecs * 1000000) { 
     lock { 
     queueAvaiable.waitUntilFulfiled() 

     cnt += 1 
     val r = exector.submit(task) 
     queue += r 

     remover.submit(runneble { 
      r.get() 
      lock { 
      removeDone() 
      queueAvaiable.signalIfFulfilled() 
      } 
     }) 
     } 
    } 

    exector.shutdown() 
    remover.shutdown() 

    assert(exector.awaitTermination(1, TimeUnit.SECONDS)) 

    cnt 
    } 

    def runneble(f: => Unit) = new Runnable { 
    def run() { 
     f 
    } 
    } 
} 

import actors.threadpool.locks.{Condition, ReentrantLock} 

class EasyLock { 
    val lock = new ReentrantLock 

    def mkCondition(f: => Boolean): EasyCondition = { 
    new EasyCondition(lock.newCondition(), f) 
    } 

    def apply(f: => Unit) { 
    lock.lock() 
    try { 
     f 
    } finally { 
     lock.unlock() 
    } 
    } 
} 

class EasyCondition(c: Condition, condition: => Boolean) { 

    def waitUntilFulfiled(f: => Unit) { 
    while (!condition) { 
     c.await() 
    } 
    f 
    } 

    def signalAllIfFulfilled() { 
    if (condition) c.signalAll() 
    } 

    def signalIfFulfilled() { 
    if (condition) c.signal() 
    } 
} 

但是這有點複雜。相反,我認爲,Twitter的經紀商和一些與經紀商聯繫並將完成的任務發送給經紀商的執行商(如果存在)可以使其更容易。

下面是一個僞代碼

val b = new Broker 
val e = new Executor // connects with the broker anyway 
e.runTask 
val o = b.recv 
o.sync() // wait until the task finishes 

是否有任何這樣的執行人?

回答

0

如果我理解正確,你可能能夠使用FuturePool這樣的:

val b = new Broker[T] 
val futurePool = FuturePool.defaultPool // or some other pool 
futurePool(task).onSuccess { b ! _ } 
+0

謝謝阿列克謝,這就是我想要的。原諒我不喜歡,因爲我的名聲太低:) – yakamoto

+0

@yakamoto你仍然可以接受答案,我相信(點擊選票下的選中標記)。 –

+0

再次感謝:P – yakamoto