2012-04-16 30 views
5

我正在使用通道來實現隊列的練習。具體來說,我試圖使用通道的大小來限制同步goroutines的數量。要曉得,我寫了下面的代碼:需要幫助瞭解選擇{}不會永久阻止的原因

package main 

import "fmt" 
import "time" 
import "math/rand" 

func runTask (t string, ch *chan bool) { 
     start := time.Now() 
     fmt.Println("starting task", t) 
     time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time 
     fmt.Println("done running task", t, "in", time.Since(start)) 
     <- *ch 
} 

func main() { 
     numWorkers := 3 
     files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"} 

     activeWorkers := make(chan bool, numWorkers) 

     for _, f := range files { 
       activeWorkers <- true 
       fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers)) 
       go runTask(f, &activeWorkers) 
     } 
     select{} 
} 

眼下,代碼崩潰與:

throw: all goroutines are asleep - deadlock! 

我的期望是,調用選擇將永遠阻止並讓夠程終止無僵局。

所以我有一個雙重的問題:爲什麼不選擇永遠阻塞,並在短時間內投入。調用for循環後,如何避免死鎖?

乾杯,

-mtw

回答

6

Arlen Cuss已經寫了一個很好的答案。我只是想爲你的工作隊列提供另一種設計。您不僅可以限制您的頻道可以緩衝的條目數量,還可以生成有限數量的工作程序,這些工作程序感覺更自然。類似的東西:

package main 

import "fmt" 
import "time" 
import "math/rand" 

func runTask(t string) string { 
    start := time.Now() 
    fmt.Println("starting task", t) 
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time 
    fmt.Println("done running task", t, "in", time.Since(start)) 
    return t 
} 

func worker(in chan string, out chan string) { 
    for t := range in { 
     out <- runTask(t) 
    } 
} 

func main() { 
    numWorkers := 3 

    // spawn workers 
    in, out := make(chan string), make(chan string) 
    for i := 0; i < numWorkers; i++ { 
     go worker(in, out) 
    } 

    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"} 

    // schedule tasks 
    go func() { 
     for _, f := range files { 
      in <- f 
     } 
    }() 

    // get results 
    for _ = range files { 
     <-out 
    } 
} 

你也可以使用一個sync.WaitGroup如果你只是想等到所有任務已執行,但使用out通道有以後可以彙總結果的優勢。例如,如果每個任務都返回該文件中的單詞數量,則可以使用最後一個循環來總結所有單個單詞計數。

4

首先,你並不需要通過一個指向通道;頻道,如地圖等, are references,表示底層數據不被複制,只有指向實際數據的指針。如果你需要一個指向chan本身的指針,你會知道什麼時候到了。

發生崩潰的原因是程序進入每個goroutine被阻止的狀態。這應該是不可能的;如果每個門廳都被阻塞,那麼沒有可能的進程來喚醒另一個門廳(因此你的程序將被掛起)。

主要圍欄在select {} - 沒有等待任何人,只是掛。一旦最後的runTask goroutine結束,只剩下主要的goroutine了,而且它正在等待沒有人。

你需要添加一些方法來知道每個goroutine何時完成;也許另一個頻道可以接收完成事件。

這有點難看,但可能有些啓發。

package main 

import "fmt" 
import "time" 
import "math/rand" 

func runTask(t string, ch chan bool, finishedCh chan bool) { 
    start := time.Now() 
    fmt.Println("starting task", t) 
    time.Sleep(time.Millisecond * time.Duration(rand.Int31n(1500))) // fake processing time 
    fmt.Println("done running task", t, "in", time.Since(start)) 
    <-ch 
    finishedCh <- true 
} 

func main() { 
    numWorkers := 3 
    files := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"} 

    activeWorkers := make(chan bool, numWorkers) 
    finishedWorkers := make(chan bool) 
    done := make(chan bool) 

    go func() { 
     remaining := len(files) 
     for remaining > 0 { 
      <-finishedWorkers 
      remaining -= 1 
     } 

     done <- true 
    }() 

    for _, f := range files { 
     activeWorkers <- true 
     fmt.Printf("activeWorkers is %d long.\n", len(activeWorkers)) 
     go runTask(f, activeWorkers, finishedWorkers) 
    } 

    <-done 
} 
+0

我會小心的;一切都以價值傳遞。地圖和切片確實被複制,但是複製數據的一部分包含指向底層數據的指針。 http://golang.org/doc/faq#pass_by_value。這是一個微妙而重要的區別;切片具有不是指針的其他數據,也不共享。 – Greg 2014-06-10 04:24:02

+0

@Greg:「地圖和切片確實被複制」。我想,分裂頭髮。根據您鏈接的文檔:「地圖和切片值的行爲與指針類似......複製地圖或切片值不會複製它指向的數據。」我的術語與Go文檔現在使用的不一致,但語義在很大程度上是等價的。 – Ashe 2014-06-10 04:46:16

+0

@Greg:已經把那裏的散文清理了一下。 – Ashe 2014-06-10 04:49:02

1

tux21b已經發布了一個更習慣的解決方案,但我想以不同的方式回答你的問題。選擇{}會永遠阻止,是的。當所有goroutines被阻塞時發生死鎖。如果你所有其他的goroutine都完成了,那麼你只剩下主要的goroutine了,這是一個僵局。

通常情況下,在所有其他人完成後,您想要在主要的例程中執行某些操作,或者使用結果,或者僅僅是清理,並且要做tux21b建議的操作。如果你真的只想完成並離開其他的goroutines來完成他們的工作,把defer runtime.Goexit()放在你的主要功能的頂部。這會導致它退出而不退出程序。