2016-02-21 84 views
0

由於Go沒有泛型,所有預製解決方案都使用我不太喜歡的類型轉換。我也想自己實現它,並嘗試下面的代碼。但是,有時它不會等待所有的goroutines,我是否過早地關閉了工作渠道?我沒有任何東西可以從他們那裏獲取。我可能也使用了一個僞輸出通道,並等待從中獲取確切的數量,但我相信下面的代碼也可以工作。我錯過了什麼?在Go中實現工作人員池

func jobWorker(id int, jobs <-chan string, wg sync.WaitGroup) { 
    wg.Add(1) 
    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     go jobWorker(w, jobs, wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 

回答

2

在goroutine之外調用wg.Add並將指針傳遞給等待組。

如果從goroutine中調用Add,主goroutine有可能在goroutine有機會運行之前調用Wait。如果Add尚未被調用,那麼Wait將立即返回。

傳遞指向goroutine的指針。否則,goroutines使用他們自己的等待組副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) { 

    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     wg.Add(1) 
     go jobWorker(w, jobs, &wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 
+0

是的,你是絕對正確的,並且將'wg.Add(1)'分隔到那裏實際上更有意義和更正確。 – Mustafa

+0

實際上,用'wg.Add(10)'調用它一次更有意義;) – fl0cke

1

您需要將指針傳遞給waitgroup,否則每個作業都會收到它自己的副本。

func jobWorker(id int, jobs <-chan string, wg *sync.WaitGroup) { 
    wg.Add(1) 
    defer wg.Done() 

    for job := range jobs { 
     item := ParseItem(job) 
     item.SaveItem() 
     MarkJobCompleted(item.ID) 
     log.Println("Saved", item.Title) 
    } 
} 

// ProcessJobs processes the jobs from the list and deletes them 
func ProcessJobs() { 

    jobs := make(chan string) 

    list := GetJobs() 
    // Start workers 
    var wg sync.WaitGroup 
    for w := 0; w < 10; w++ { 
     go jobWorker(w, jobs, &wg) 
    } 

    for _, url := range list { 
     jobs <- url 
    } 

    close(jobs) 
    wg.Wait() 
} 

看到這裏的區別:without pointerwith pointer

+0

你是對的,非常愚蠢的錯誤! :) – Mustafa

相關問題