2014-06-16 215 views
6

我希望for循環使用go例程並行。我試圖使用渠道,但沒有奏效。我的主要問題是,我想在繼續之前等待所有迭代完成。這就是爲什麼簡單地在go之前無效的原因。我試圖用信道(我認爲錯誤的方式),但使我的代碼更慢並行For循環

func createPopulation(populationSize int, individualSize int) []Individual { 
    population := make([]Individual, populationSize) 

    //i want this loop to be work parallel 
    for i := 0; i < len(population); i++ { 
     population[i] = createIndividual(individualSize) 
    } 

    return population 
} 

func createIndividual(size int) Individual { 
    var individual = Individual{make([]bool, size), 0} 

    for i := 0; i < len(individual.gene); i++ { 
     if rand.Intn(2)%2 == 1 { 
      individual.gene[i] = true 
     } else { 
      individual.gene[i] = false 
     } 
    } 

    return individual 
} 

我的結構是這樣的:

type Individual struct { 
    gene []bool 
    fitness int 
} 
+0

已經嘗試了不同的版本出來時你設置環境變量GOMAXPROCS? – fabrizioM

回答

6

所以基本上夠程不應該返回一個值,但推它下降了一個通道。如果你想等所有的goroutine完成,你只需要計數goroutines的數量,或者使用WaitGroup。在這個例子中,這是一個矯枉過正的例子,因爲它的大小是已知的,但無論如何這是一個很好的做法。這裏有一個變形例:

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
) 

type Individual struct { 
    gene []bool 
    fitness int 
} 


func createPopulation(populationSize int, individualSize int) []Individual { 

    // we create a slice with a capacity of populationSize but 0 size 
    // so we'll avoid extra unneeded allocations 
    population := make([]Individual, 0, populationSize) 

    // we create a buffered channel so writing to it won't block while we wait for the waitgroup to finish 
    ch := make(chan Individual, populationSize) 

    // we create a waitgroup - basically block until N tasks say they are done 
    wg := sync.WaitGroup{} 

    for i := 0; i < populationSize; i++ { 

     //we add 1 to the wait group - each worker will decrease it back 
     wg.Add(1) 

     //now we spawn a goroutine 
     go createIndividual(individualSize, ch, &wg) 
    } 

    // now we wait for everyone to finish - again, not a must. 
    // you can just receive from the channel N times, and use a timeout or something for safety 
    wg.Wait() 

    // we need to close the channel or the following loop will get stuck 
    close(ch) 

    // we iterate over the closed channel and receive all data from it 
    for individual := range ch { 

     population = append(population, individual) 
    } 
    return population 

} 

func createIndividual(size int, ch chan Individual, wg *sync.WaitGroup) { 

    var individual = Individual{make([]bool, size), 0} 

    for i := 0; i < len(individual.gene); i++ { 
     if rand.Intn(2)%2 == 1 { 
      individual.gene[i] = true 
     } else { 
      individual.gene[i] = false 
     } 
    } 

    // push the population object down the channel 
    ch <- individual 
    // let the wait group know we finished 
    wg.Done() 

} 
+3

很好的答案,但是在這個例子中,我認爲'population:= make([] Individual,populationSize)'應該可能是'population:= make([] Individual,0)',否則'append'語句將新的個體在長度爲'populationSize'的空片段末尾。 http://play.golang.org/p/6eYlk40Oal – Intermernet

+0

@Intermernet你是對的,我錯過了。我會解決我的答案。 –

+3

固定爲分配已知*容量*的片,但只有0個成員。 –

0

既然你知道你會事先有多少人有,我會使用渠道和避免剛分配的​​各個成員的夠程createIndividual。然後createIndividual簽名是這樣的:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) 

和調用代碼是這樣的:

population := make([]Individual, populationSize) 
wg := &sync.WaitGroup{} 
wg.Add(len(population)) 

for i := 0; i < len(population); i++ { 
    go createIndividual(wg, &population[i], individualSize) 
} 

wg.Wait() 

因此,各走各的例程負責恰好一個個體,它分配給在​​相應的插槽:

func createIndividual(wg *sync.WaitGroup, individual *Individual, size int) { 
    defer wg.Done() 
    *individual = Individual{make([]bool, size), 0} 

    // assign other attributes to `individual` 
} 

你可以看到一個完整的代碼示例on play here

+0

這肯定會起作用,但有人可能會認爲它違背了Go的「通過溝通共享內存」習語。 –

+0

@Not_a_Golfer是的,但由於這似乎是一個非常小而孤立的問題,我會爭辯說,爲了性能和可讀性,這樣做很好。但是這當然取決於'createIndividual'最終會做什麼。結果可能會證明渠道解決方案實際上更快。 – nemo

+0

我甚至不確定,因爲所有這些東西都是100%的CPU,並且非常快(每個goroutine),並行處理會比單循環更快。取決於我猜測的「個體大小」。沒有渠道有很大的性能影響,但是你知道,這就是Heartbleed這樣的東西誕生的原因:) –

1

向這樣的循環添加受控並行性的一種常見方式是生成許多將從通道讀取任務的工作程序。 runtime.NumCPU功能可能有助於決定產生多少工人有意義(請確保您設置GOMAXPROCS以利用這些CPU)。然後,您只需將這些工作寫入該渠道,並由工人處理。

在這種情況下,任務是初始化總體切片的元素,因此使用*Individual指針的通道可能是有意義的。事情是這樣的:

ch := make(chan *Individual) 
for i := 0; i < nworkers; i++ { 
    go initIndividuals(individualSize, ch) 
} 

population := make([]Individual, populationSize) 
for i := 0; i < len(population); i++ { 
    ch <- &population[i] 
} 
close(ch) 

工人夠程會是這個樣子:

func initIndividuals(size int, ch <-chan *Individual) { 
    for individual := range ch { 
     // Or alternatively inline the createIndividual() code here if it is the only call 
     *individual = createIndividual(size) 
    } 
} 

由於任務沒有分部分的提前出局,這並不重要,如果createIndividual花費可變的時間量:每個工作人員只有在最後一個工作完成時纔會執行一項新任務,並且在沒有任務離開時(因爲此時通道關閉)而退出。

但我們怎麼知道工作何時完成? sync.WaitGroup類型可以在這裏幫助。產卵工人夠程的代碼可以被修改,如下所示:

ch := make(chan *Individual) 
var wg sync.WaitGroup 
wg.Add(nworkers) 
for i := 0; i < nworkers; i++ { 
    go initIndividuals(individualSize, ch, &wg) 
} 

initIndividuals功能也被修改採取的其他參數,並添加defer wg.Done()作爲第一個發言。現在撥打wg.Wait()的電話將會阻止,直到所有工作人員的衛生間完成。然後您可以返回完整構建的​​切片。

+0

'for i:= 9;我

+0

這是一個錯字。感謝您指出。 –

+0

和什麼nworkers? GOMAXPROCS或人口規模? –

2

對於您的具體問題,您根本不需要使用通道。

但是除非您的createIndividual花費了一些時間進行計算,否則在並行運行時,goroutine之間的上下文切換總是會慢得多。

type Individual struct { 
    gene []bool 
    fitness int 
} 

func createPopulation(populationSize int, individualSize int) (population []*Individual) { 
    var wg sync.WaitGroup 
    population = make([]*Individual, populationSize) 

    wg.Add(populationSize) 
    for i := 0; i < populationSize; i++ { 
     go func(i int) { 
      population[i] = createIndividual(individualSize) 
      wg.Done() 
     }(i) 
    } 
    wg.Wait() 
    return 
} 

func createIndividual(size int) *Individual { 
    individual := &Individual{make([]bool, size), 0} 

    for i := 0; i < size; i++ { 
     individual.gene[i] = rand.Intn(2)%2 == 1 
    } 

    return individual 
} 

func main() { 
    numcpu := flag.Int("cpu", runtime.NumCPU(), "") 
    flag.Parse() 
    runtime.GOMAXPROCS(*numcpu) 
    pop := createPopulation(1e2, 21e3) 
    fmt.Println(len(pop)) 
} 

輸出:

┌─ [email protected] [/tmp]                            
└──➜ go build blah.go; xtime ./blah -cpu 1 
100 
0.13u 0.00s 0.13r 4556kB ./blah -cpu 1 
┌─ [email protected] [/tmp]                            
└──➜ go build blah.go; xtime ./blah -cpu 4 
100 
2.10u 0.12s 0.60r 4724kB ./blah -cpu 4 
0

如果你想避免混合併發邏輯與商業邏輯,我寫了這個庫https://github.com/shomali11/parallelizer來幫你。它封裝了併發邏輯,所以你不必擔心它。

所以在你的例子:

package main 

import (
    "github.com/shomali11/parallelizer" 
    "fmt" 
) 

func main() { 
    populationSize := 100 
    results = make([]*Individual, populationSize) 

    options := &Options{ Timeout: time.Second } 
    group := parallelizer.NewGroup(options) 
    for i := 0; i < populationSize; i++ { 
     group.Add(func(index int, results *[]*Individual) { 
      return func() { 
       ... 

       results[index] = &Individual{...} 
      } 
     }(i, &results)) 
    } 

    err := group.Run() 

    fmt.Println("Done") 
    fmt.Println(fmt.Sprintf("Results: %v", results)) 
    fmt.Printf("Error: %v", err) // nil if it completed, err if timed out 
}