2017-01-23 60 views
2

我有以下功能:如何進行並行通用去治療?

func myrun(entries []WhatEverType) { 
    for i := range entries { 
     dotreatment(entries[i]) 
    } 
} 

我要讓並行調用dotreatment,我試過如下:

func myrunMT(entries []WhatEverType) { 
     var wg sync.WaitGroup 
     stopped := false 
     threads := 5 //number of threads could be argument 
     com := make(chan WhatEverType, 100) //size of chan could be argument 
     wg.Add(threads) 
     for i := 0; i < threads; i++ { 
      go func() { 
       for !stopped || len(com) { 
        select { 
         case entry := <-com: 
          dotreatment(entry) //lock if necessary 
         case time.After(100*time.Millisecond): 
        } 
       } 
       wg.Done() 
      }() 
     } 
     for _, entry := range entries { 
      com <- entry 
     } 
     stopped = true 
     wg.Wait() 
    } 

有沒有更好的辦法做到這一點?特別是我想避免通過一個陳發送所有條目,並且只在共享程序之間使用共享索引。

+0

如果你有一般需要做並行工作,將控制節流並行「線程」數量的批生產,則可以像這樣使用(或複製)'code.cloudfoundry.org/workpool'包:https://play.golang.org/p/XKbT67vP6i(注意,這實際上並不在遊樂場中運行,因爲它導入了一個外部包)。 –

回答

1

首先,您的解決方案有數據競爭。您正在閱讀並修改來自多個goroutines的stopped變量。

一個簡單的解決方案可能是劃分傳遞切片的索引範圍,並讓多個goroutines處理不同的索引範圍。這是怎麼可能看起來像:

func process(ms []My) { 
    workers := 5 
    count := len(ms)/workers 
    if count*workers < len(ms) { 
     count++ 
    } 

    wg := &sync.WaitGroup{} 
    for idx := 0; idx < len(ms); { 
     wg.Add(1) 
     idx2 := idx + count 
     if idx2 > len(ms) { 
      idx2 = len(ms) 
     } 
     ms2 := ms[idx:idx2] 
     idx = idx2 
     go func() { 
      defer wg.Done() 
      for i := range ms2 { 
       handle(&ms2[i]) 
      } 
     }() 
    } 
    wg.Wait() 
} 

func handle(m *My) {} 

對於工人的數量夠程,你可以使用runtime.GOMAXPROCS(),因爲如果處理項目不涉及IO操作(或外等待的夠程的東西),沒有必要有Go運行時管理比那些更夠程可主動運行:

workers := runtime.GOMAXPROCS(0) 

注意,雖然這種解決方案並不涉及通過通道發送的條目,如果一個(一些)的goroutine較早完成後,CPU使用率可能會在下降結束時(當較少的門房有工作要做)。

生產者 - 消費者模型的優點是所有的工人程序都會一直工作到最後。但是,通信開銷可能不會忽略不計。一個人是否比另一個人更好取決於每個項目需要完成的工作量。

一個改進版本可以混合2:你可以發送更小的切片,更小的索引範圍在一個通道上,例如, 100批次的批次。與第一種解決方案相比,這可以減少空閒時間,並且還可以降低通信開銷,因爲通過信道單獨發送條目,因此發送的值僅爲總數的百分之一。

這是一示例實現的這種改善,混合版本:

func process(ms []My) { 
    workers := runtime.GOMAXPROCS(0) 
    // 100 jobs per worker average: 
    count := len(ms)/workers/100 
    if count < 1 { 
     count = 1 
    } 

    ch := make(chan []My, workers*2) // Buffer size scales with # of workers 

    wg := &sync.WaitGroup{} 

    // Start workers 
    wg.Add(workers) 
    for i := 0; i < workers; i++ { 
     go func() { 
      defer wg.Done() 
      for ms2 := range ch { 
       for j := range ms2 { 
        handle(&ms2[j]) 
       } 
      } 
     }() 
    } 

    // Send jobs: 
    for idx := 0; idx < len(ms); { 
     idx2 := idx + count 
     if idx2 > len(ms) { 
      idx2 = len(ms) 
     } 
     ch <- ms[idx:idx2] 
     idx = idx2 
    } 

    // Jobs sent, close channel: 
    close(ch) 

    // Wait workers to finish processing all jobs: 
    wg.Wait() 
} 

注意,沒有stopping變量來發送完成信號。相反,我們在每個goroutine的通道上使用了for range,因爲它覆蓋了通道,直到通道關閉,並且它對於併發使用是安全的。一旦通道關閉協程會處理在通道上發送的所有作業,它們將終止,整個處理算法也會結束(而不是更早 - 意味着將處理所有作業)。

+0

我喜歡這個,很好的想法是在工人之間拆分數據,讓他們獨立,但是你仍然在複製數據。一種改進是隻發送通道中的兩個索引(開始和停止),然後以只讀方式訪問數據(線程安全)。另一個改進是在開始時的工人之間分工。沒有渠道了,但你可以有一個沒有更多的工作,當另一個仍然很多工作要做的時候,但平均應該沒問題。最後一點,在這樣的用例中(在我的第一次運行中),不確定競爭條件可能發生在布爾值上。 –

+0

@Séb首先,數據不會被複制。 '[] My'是_slice_不是數組。如果slice vs array不清楚,請查看博文[Go Slices:usage and internals](https://blog.golang.org/go-slices-usage-and-internals)。第二,分片比發送索引稍大一些(參見['reflect.SliceHeader'](https://golang.org/pkg/reflect/#SliceHeader),它只是一個指針和2個'int'數字)。第三,數據競賽確實發生。不要猜測,用'-race'選項運行它可以看到。 – icza

+0

@Séb關於「無害」的數據爭奪,請閱讀[是否安全地讀取一個函數指針同時沒有鎖?](http://stackoverflow.com/questions/41406501/is-it-safe-to-read- a-function-pointer-concurrently-without-a-lock/41407827#41407827) – icza

0

我不會混合通道和同步原語。獨佔渠道的使用是慣用的Go。請記住,Go例程不是線程,而且開銷較低,重量更輕。啓動100萬個並不是什麼大問題。
如果結果的順序並不重要,我會做這樣的事情:

func parallelRun(input []WhateverInputType) []WhateverOutputType { 
    out := make(chan WhateverOutputType, len(input)) 
    for _, item := range input { 
     go func(i WhateverInputType) { 
      out <- process(i) 
     }(item) 
    } 

    res := make([]WhateverOutputType, len(input)) 
    for i := 0; i < len(input); i++ { 
     res[i] = <-out 
    } 

    return res 
} 

func process(input WhateverInputType) WhateverOutputType { 
    time.Sleep(50 * time.Millisecond) 
    return WhateverOutputType{} 
} 

假設「過程」花費的時間比收集結果更長的時間,我甚至會使用阻塞通道out := make(chan WhateverOutputType)
請注意,通過數組作爲參數並不理想(有複製),但我試圖保持原始代碼的精神。

+0

您正在創建與輸入列表中的元素數量相同的許多goroutines,我認爲您將以我的經驗以這種方式在性能方面造成瓶頸。 –

+0

它取決於輸入有多大,有多長/可預測的'過程',有多少核心等等。你知道性能優化的三條規則:度量,度量和度量。 –

+0

你是對的,但它是真正的度量,它告訴我重用線程而不是創建線程,幾周前我已經測試過這個問題。 –

0

搜索我得到使用一個共享的索引,沒有數據的複製以下後:

func myrunMT(entries []WhatEverType) int { 
    lastone := int32(len(entries)-1) 
    current := int32(0) 
    var wg sync.WaitGroup 
    threads := 5 
    //start threads 
    wg.Add(threads) 
    for i := 0; i < threads; i++ { 
     go func() { 
      for { 
       idx := atomic.AddInt32(&current, 1)-1 
       if Loadint32(&current) > Loadint32(&lastone) { 
        break 
       } 
       dotreatment(entries[idx]) 
      } 
      wg.Done() 
     }() 
    } 
    wg.Wait() 
} 
+0

確定所有讀數後,只有在運行期間沒有修改條目的情況下才可以,否則可能會發生數據競爭 –