2017-05-11 91 views
0

我正在構建一個方法,它需要x大小的方法序列並返回第一個方法的結果完成。斯卡拉 - 多線程,當任何子線程完成時完成主線程

def invokeAny(work: Seq[() => Int]): Int = ??? 

如何通過使用線程來完成此操作? (不允許期貨)

這是我已經能夠提出的最好的,但似乎不適用於所有情況。

def invokeAny(work: Seq[() => Int]): Int = { 
    @volatile var result = 0 // set to return value of any work function 
    val main = Thread.currentThread() 

    val threads: Seq[Thread] = work.map(work => new Thread(new Runnable { 
     def run { result = work(); main.interrupt(); }})) 

    threads.foreach(_.start()) 
    for(thread <- threads) { 
     try { 
     thread.join() 
     } catch { 
     // We've been interrupted: finish 
     case e: InterruptedException => return result 
    } 
    } 
    return result 
    } 
+0

我會建議,而不是使用線程 –

+0

啊,我知道期貨,但我想知道如何使用線程 – Leero11

+0

使用值爲1的'java.util.concurrent.CountDownLatch'。你的子線程會調用'latch.countDown()',主線程將使用'latch.await()' –

回答

0

不是pretiest答案,但似乎工作:

def invokeAny(work: Seq[() => Int]): Int = { 
    @volatile var result = 0 // set to return value of any work function 
    val main = Thread.currentThread() 

    var threads: Seq[Thread] = Seq() 

    //Interrupts all threads after one is interrupted 
    def interruptAll = { 
     main.interrupt() 
     for(thread <- threads) { 
     thread.interrupt() 

     } 
    } 

    threads = work.map(work => new Thread( 
     new Runnable { 
      def run { 
      result = try { 
       work() } catch { 
       case e:InterruptedException => return 
      } 
      interruptAll; 

      } 
      })) 

    threads.foreach(_.start()) 
    for(thread <- threads) { 
     try { 
     thread.join() 
     } catch { 
     // We've been interrupted: finish 
     case e: InterruptedException => return result 
    } 
    } 
    return result 
    } 
0

使用的BlockingQueue,沒有共享的可變狀態,工作線程寫入隊列,主線程等待,直到他們完成和讀取那麼隊列做的結果類似總和

def invokeAny1(work: Seq[() => Int]): Int = { 
    val queue = new ArrayBlockingQueue[Int](work.size) 

    val threads: Seq[Thread] = work.map(w => new Thread(new Runnable { 
     def run { 
     val result= w() 
     queue.put(result) }})) 

    threads.foreach(_.start()) 
    threads.foreach(_.join()) 

    var sum:Int=0 

    while(!queue.isEmpty) { 
     sum +=queue.take() 
    } 
     sum 
} 

使用的CountDownLatch。 工作線程增加一個原子變量。 當所有線程都完成的鎖被釋放,主線程可以從原子變量讀取數據

def invokeAny2(work: Seq[() => Int]): Int = { 

    val total=new AtomicInteger 
    val latch= new CountDownLatch(work.size) 

    val threads: Seq[Thread] = work.map(w => new Thread(new Runnable { 
     def run { 

     val result= w() 
     total.getAndAdd(result) 
     latch.countDown  
     }})) 

    threads.foreach(_.start()) 

    latch.await //wait till the latch is released 
    total.get 
    } 

} 
+0

等待它我測試了你的代碼,但它似乎給出了錯誤的答案,並且比我發佈的代碼長四倍:// – Leero11

+1

我的代碼與你發佈的代碼有點不同,它會等到所有代碼都完成。如果你只想要第一個線程的結果完成,那麼在第二個代碼段中使val latch = new CountDownLatch(1) –

+0

好酷;)thx參與 – Leero11