2016-11-25 34 views
2

我有一個簡單的併發用例在去,它讓我瘋狂我找不出一個優雅的解決方案。任何幫助,將不勝感激。慣用的goroutine終止和錯誤處理

我想寫一個方法fetchAll並行查詢來自遠程服務器的未指定數量的資源。如果任何提取失敗,我想立即返回第一個錯誤。從閱讀https://blog.golang.org/pipelines我可以創建一個信號通道清理其他線程https://play.golang.org/p/Be93J514R5

我知道:

我最初的,幼稚的做法,漏夠程:

package main 

import (
    "fmt" 
    "math/rand" 
    "sync" 
    "time" 
) 

func fetchAll() error { 
    wg := sync.WaitGroup{} 
    errs := make(chan error) 
    leaks := make(map[int]struct{}) 
    defer fmt.Println("these goroutines leaked:", leaks) 

    // run all the http requests in parallel 
    for i := 0; i < 4; i++ { 
    leaks[i] = struct{}{} 
    wg.Add(1) 
    go func(i int) { 
     defer wg.Done() 
     defer delete(leaks, i) 

     // pretend this does an http request and returns an error 
     time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) 
     errs <- fmt.Errorf("goroutine %d's error returned", i) 
    }(i) 
    } 

    // wait until all the fetches are done and close the error 
    // channel so the loop below terminates 
    go func() { 
    wg.Wait() 
    close(errs) 
    }() 

    // return the first error 
    for err := range errs { 
    if err != nil { 
     return err 
    } 
    } 

    return nil 
} 

func main() { 
    fmt.Println(fetchAll()) 
} 

遊樂場。或者,我可以使用context來完成它。但是,似乎這樣一個簡單的用例應該有一個我錯過的更簡單的解決方案。

回答

4

除了其中一個goroutines外,其他所有人都泄露了,因爲他們仍在等待發送到errs頻道 - 您從未完成清空它的範圍。你也在泄漏那個工作是關閉錯誤通道的例程,因爲等待組從未完成。

(同時,隨着安迪指出,從地圖中刪除是不是線程安全的,這樣會需要保護,以免遭受互斥。)

不過,我不認爲地圖,互斥waitgroups,上下文等等在這裏甚至是必要的。我已經重寫了整個事情只使用基本的渠道運營,類似如下:

package main 

import (
    "fmt" 
    "math/rand" 
    "time" 
) 

func fetchAll() error { 
    var N = 4 
    quit := make(chan bool) 
    errc := make(chan error) 
    done := make(chan error) 
    for i := 0; i < N; i++ { 
     go func(i int) { 
      // dummy fetch 
      time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) 
      err := error(nil) 
      if rand.Intn(2) == 0 { 
       err = fmt.Errorf("goroutine %d's error returned", i) 
      } 
      ch := done // we'll send to done if nil error and to errc otherwise 
      if err != nil { 
       ch = errc 
      } 
      select { 
      case ch <- err: 
       return 
      case <-quit: 
       return 
      } 
     }(i) 
    } 
    count := 0 
    for { 
     select { 
     case err := <-errc: 
      close(quit) 
      return err 
     case <-done: 
      count++ 
      if count == N { 
       return nil // got all N signals, so there was no error 
      } 
     } 
    } 
} 

func main() { 
    rand.Seed(time.Now().UnixNano()) 
    fmt.Println(fetchAll()) 
} 

遊樂場鏈接:https://play.golang.org/p/mxGhSYYkOb

編輯:有確實是一個愚蠢的錯誤,謝謝指點出來。我修正了上面的代碼(我認爲...)。我還爲增加的Realism™添加了一些隨機性。

另外,我想強調的是真的有多種方法來解決這個問題,我的解決方案只是一種方法。最終歸結爲個人品味,但總體而言,您希望努力實現「慣用」代碼 - 並朝着自然且易於理解的風格邁進。

+0

'ec:= chan error(nil)'很有趣,我之前沒有看到過這種模式。我認爲'select'原因是以隨機順序執行的。在'ec <-err'之前發送'done <-true'是否有競爭條件? – gerad

+0

很好,絕對有一場比賽!我寫得很快,就像我提到的那樣,沒有測試它(你應該始終這樣做)。幸運的是,修復這個錯誤只會讓整個代碼變得更簡單,在這種情況下,不需要'chan error(nil)'技巧(當你想阻止select語句的發送時,這很有用,所以你不要不必寫多個條件選擇)。感謝您指出我的錯誤:) – Aedolon

+0

這可以進一步簡化。你不需要單獨完成和錯誤的渠道,還有其他一些事情不會改進。 https://play.golang.org/p/1a0ZXuy3Dz –

0

只要每個goroutine完成,你就不會泄漏任何東西。您應該創建緩衝區的錯誤通道,其緩衝區大小等於goroutine的數量,以便通道上的發送操作不會被阻止。每個goroutine總是應該在通道上發送一些信息,無論它是成功還是失敗。然後底部的循環可以迭代goroutine的數量,如果得到非零錯誤則返回。您不需要WaitGroup或另一個關閉通道的goroutine。

我認爲它看起來goroutines泄漏的原因是,當你得到第一個錯誤時你會返回,所以有些仍在運行。

順便說一下,地圖不是goroutine安全的。如果你在goroutines中共享一個地圖,並且其中一些正在對地圖進行更改,則需要使用互斥鎖來保護它。

+0

我同意使用緩衝通道可以工作,但我試圖避免這個解決方案,雖然提取數量不易事先知道(實際代碼比示例更復雜)。 – gerad