2015-12-05 33 views
3

我目前試圖通過map來嘗試range而不是同步執行併發數據庫請求,顯然是因爲速度提升。我怎樣才能使用通道知道什麼時候所有的循環啓動的循環都完成

我的問題是我有這樣的事情:

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
for containerid := range containers { 
    go func() { 
     rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid) 
     defer rows.Close() 
     if err != nil { 
      log.Println(err) 
     } 
     for rows.Next() { 
      mainthread := &MainThread{} 
      err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel) 
      if err != nil { 
       log.Println(err) 
      } 
      mainthreads <- mainthread 
     } 
    }() 
    mainthreadsFetched <- struct{}{} 
} 

// Get all mainthreads 
<-mainthreadsFetched 
// Do other stuff after complete 

顯然mainthreadsFetched <- struct{}{}被稱爲幾乎是瞬間,因爲循環完成的速度比你眨眼,我怎麼能爲每循環不會阻止新渠道每個新的goroutine從開始,而是讓循環開始全部goroutines,然後在每個goroutine完成時發送通道。

回答

3

使用sync.WaitGroup是一個很好的解決方案,並且是一個通常使用的解決方案。

或者,您可以在mainthreadsFetchedlen(containers)次上接收,而不僅僅是1次。這將保證所有的例行程序都已完成。您需要將發送線路移動到去程序的末尾(或者更好,延遲)。

另外,由於containerid位於for循環中,因此其值發生變化。您需要將它作爲參數傳遞給go例程關閉。

+0

OP明確表示他知道他可以使用sync.WaitGroup,但正在尋找使用渠道的替代方案,因此您根本沒有真正回答他的問題。 – evanmcdonnal

+0

看到我的答案的第二段,我給他這樣一個選擇。我引用'sync.WaitGroup'是因爲它是更好的解決方案,無論他是否想使用它。 – mjibson

+0

@mjibson我不想使用'sync.WaitGroup'的唯一原因是因爲我有一些嵌套的數據庫循環,我最終會得到所有關係數據庫的東西,所以我不想要'wg.Add(1)'和'wg.Done()'嵌套了15遍我認爲頻道應該是更好的解決方案 – Datsik

0

所以我能想出這樣做的最好方法是使用sync.WaitGroup,做這樣的事情:

var wg sync.WaitGroup 
var mainThreadFetched = make(chan MainThread) 
for containerid := range containers { 
    wg.Add(1) 
    go func(containerid int64) { 
     rows, err := db.Query("SELECT thread_id, belongs_to, thread_name, access_level FROM forum_mainthread WHERE belongs_to = ?", containerid) 
     defer rows.Close() 
     if err != nil { 
      log.Println(err) 
     } 
     for rows.Next() { 
      mainthread := MainThread{} 
      err := rows.Scan(&mainthread.MainThreadID, &mainthread.BelongsTo, &mainthread.ThreadName, &mainthread.AccessLevel) 
      if err != nil { 
       log.Println(err) 
      } 
      mainThreadFetched <- mainthread 
     } 
     wg.Done() 
    }(containerid) 
} 

go func() { 
    wg.Wait() 
    close(mainThreadFetched) 
}() 

for mainthread := range mainThreadFetched { 
    containers[mainthread.BelongsTo].MainThreads = append(containers[mainthread.BelongsTo].MainThreads, mainthread) 
} 

// Do other stuff 

現在我可以從mainThreadFetched通道讀取,然後在WaitGroup滿足它將關閉允許循環結束並繼續的通道

0

我沒有看到你正在讀取mainthreads的位置。如果它不是緩衝頻道,則需要以某種方式解決這個問題。我將提供一些解決方案 - 沒有一個比另一個更「正確」 - 這取決於您的需求。

變體A 這是最簡單的解決方案,但它假定你有一些其他的goroutine讀mainthreads(其可能已經是的情況下)

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
go somethingWhichReadsMainThreads() 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted for brevity 
      mainthreads <- mainthread 
     } 
     mainthreadsFetched <- struct{}{} 
    }(containerid) 
} 

for i := 0; i < len(containers); i++ { 
    <-mainThreadsFetched 
} 
close(mainthreads) 
// Do other stuff after complete 

變式B 這一個用途select聲明處理讀取線程與完成通知分開,而不需要另一個goroutine。

var mainthreads = make(chan *mainthread) 
var mainthreadsFetched = make(chan struct{}) 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted for brevity 
      mainthreads <- mainthread 
     } 
     mainthreadsFetched <- struct{}{} 
    }(containerid) 
} 

numComplete := 0 
readRunning := true 
for readRunning { 
    select { 
    case thread := <-mainthreads: 
     // do something with thread, like threads = append(threads, thread) 
    case <-mainthreadsFetched: 
     numFetched++ 
     if numFetched == len(containers) { 
      readRunning = False 
     } 
    } 
} 
// Do other stuff after complete 

變式C 這一個使用了不使用「零值」(零),用於傳遞的實際數據,因此可以使用以下事實:作爲信號值,而不是一個單獨的結構通道。它具有代碼少得多的優點,但它確實感覺像遠方的鬼怪行爲。

var mainthreads = make(chan *mainthread) 
for containerid := range containers { 
    go func(containerid int) { 
     // build query omitted for brevity 
     for rows.Next() { 
      // omitted Scan for brevity 
      mainthreads <- mainthread 
     } 
     mainthreads <- nil // nil signals to us we are done 
    }(containerid) 
} 

numComplete := 0 
for thread := range mainthreads { 
    if thread != nil { 
     // do something with thread, like threads = append(threads, thread) 
    } else { 
     numFetched++ 
     if numFetched == len(containers) { 
      break 
     } 
    } 
} 
// Do other stuff after complete 
相關問題