2015-04-04 71 views
0

我可能錯過了某些東西,或不理解Go如何處理併發(或者我的併發知識),我已經設計了一些代碼來理解多個生產者/消費者。併發與多個生產者/多個消費者

這是代碼:

package main 

import (
    "fmt" 
    "time" 
    // "math/rand" 
    "sync" 
) 

var seq uint64 = 0 
var generatorChan chan uint64 
var requestChan chan uint64 

func makeTimestamp() int64 { 
    return time.Now().UnixNano()/int64(time.Millisecond) 
} 

func generateStuff(genId int) { 
    var crap uint64 
    for { 
     crap = <-requestChan 
     // <- requestChan 
     seq = seq+1 
     fmt.Println("Gen ", genId, " - From : ", crap, " @", makeTimestamp()) 
     generatorChan <- uint64(seq) 
    } 
} 

func concurrentPrint(id int, work *sync.WaitGroup) { 
    defer work.Done() 

    for i := 0; i < 5; i++ { 
     requestChan<-uint64(id) 
     fmt.Println("Conc", id, ": ", <-generatorChan) 
    } 
} 

func main() { 
    generatorChan = make(chan uint64) 
    requestChan = make(chan uint64) 
    var wg sync.WaitGroup 
    for i := 0; i < 20; i++ { 
     go generateStuff(i) 
    } 
    maximumWorker := 200 
    wg.Add(maximumWorker) 
    for i := 0; i < maximumWorker; i++ { 
     go concurrentPrint(i, &wg) 
    } 
    wg.Wait() 
} 

當從1運行它打印(主要是按順序)的所有數字至1000(200名消費者得到一個數每5次)。 我本來預計一些消費者會打印完全相同的號碼,但看起來請求代碼正在阻止這種情況,即使有20個goroutines服務於generateStuff,它們通過增加一個全局變量。

我在Go或Concurrency中遇到了什麼問題?

我本來預料到類似於類似的兩個去程序的情況,generateStuff會一起醒來,同時增加seq,因此有兩個消費者打印相同數字兩次。

EDIT代碼上playgolang:http://play.golang.org/p/eRzNXjdxtZ

+0

注意,你有獲得潛在的數據賽跑到全局'seq'變量(除了未緩衝的'requestChan'可能會使它們分開)。 – 2015-04-04 20:25:20

+0

是的,這是我所期望的戴夫,一些非確定性的行爲給我不是所有的數字從1到1000,但有些不同。 在這個例子中,chans被緩衝了http://play.golang.org/p/tq7-6Bc0hL,仍然得到相同的結果。 – Marlon 2015-04-04 20:33:19

+0

你用'GOMAXPROCS'> 1運行了嗎? (也許使用'-race') – 2015-04-04 20:34:27

回答

1

您有多個工人,可以在同一時間都運行和所有的嘗試,並在同一時間的請求。由於requestChan沒有緩衝,所以它們都阻止等待閱讀器同步並接受他們的請求。

您有多個生成器,它們將通過requestChan與請求者同步,生成一個結果,然後阻止未緩衝的generatorChan,直到工作人員讀取結果爲止。注意它可能是一個不同的工作者。

沒有額外的同步,所以其他一切都是非確定性的。

  • 一個生成器可以發出所有的請求。
  • 發電機可以抓住一個請求,並在任何其他發電機碰巧有機會運行之前通過遞增seq 。只有一個處理器,這可能甚至是可能的。
  • 所有的生成器都可以抓取請求,並且最終都希望在同一時刻增加seq,導致各種問題。
  • 工作人員可以從他們碰巧發送給或來自完全不同的發電機的同一發電機獲得響應。

通常,如果不添加同步來強制這些行爲之一,則無法確保這些行爲中的任何一個發生。

請注意,對於數據競爭來說,這本身就是另一個非確定性事件。有可能獲得任意值,程序崩潰等。假設在競爭條件下,這個值可能只是被一個或一些這樣的相對無害的結果所關閉。

對於試驗來說,最好的辦法是振作起來GOMAXPROCS。無論是通過環境變量(例如env GOMAXPROCS=16 go run foo.goenv GOMAXPROCS=16 ./foo之後的go build)或通過從您的程序中調用runtime.GOMAXPROCS(16)。默認值爲1,這意味着數據競賽或其他「奇怪」行爲可能被隱藏。

您還可以通過在各個位置添加對runtime.Goschedtime.Sleep的呼叫來影響一些事情。

如果您使用比賽檢測器(例如go run -race foo.googo build -race),還可以看到數據競賽。程序不僅應該在退出時顯示「找到1個數據競賽」,而且應該在首次檢測到競賽時還會用堆棧跟蹤轉儲出大量細節。

這裏是你的代碼進行實驗一種「清理」版本:

package main 

import (
    "log" 
    "sync" 
    "sync/atomic" 
) 

var seq uint64 = 0 
var generatorChan = make(chan uint64) 
var requestChan = make(chan uint64) 

func generator(genID int) { 
    for reqID := range requestChan { 
     // If you want to see a data race: 
     //seq = seq + 1 
     // Else: 
     s := atomic.AddUint64(&seq, 1) 
     log.Printf("Gen: %2d, from %3d", genID, reqID) 
     generatorChan <- s 
    } 
} 

func worker(id int, work *sync.WaitGroup) { 
    defer work.Done() 

    for i := 0; i < 5; i++ { 
     requestChan <- uint64(id) 
     log.Printf("\t\t\tWorker: %3d got %4d", id, <-generatorChan) 
    } 
} 

func main() { 
    log.SetFlags(log.Lmicroseconds) 
    const (
     numGen = 20 
     numWorker = 200 
    ) 
    var wg sync.WaitGroup 
    for i := 0; i < numGen; i++ { 
     go generator(i) 
    } 
    wg.Add(numWorker) 
    for i := 0; i < numWorker; i++ { 
     go worker(i, &wg) 
    } 
    wg.Wait() 
    close(requestChan) 
} 

Playground(但要注意,在操場上的時間戳不會是有用的,並呼籲runtime.MAXPROCS可能不會做任何事情)。進一步注意,操作緩存的結果,以便重新運行完全相同的程序將始終顯示相同的輸出,您需要做一些小的更改或者只是在自己的機器上運行它。

很大程度上變化小象分流下來的發電機,使用logfmt因爲前者使得併發保證,消除數據爭,使得輸出看起來更好,等

+0

非常感謝!清理代碼) – Marlon 2015-04-05 08:07:18

0

Channel types

信道提供了一種機制用於同時執行功能 通過發送和接收一個指定的元素 類型的值進行通信。未初始化通道的值爲零。

可以使用內置的 函數make,它接受信道類型和作爲參數的可選容量 製成一個新的,初始化的信道值:

make(chan int, 100) 

容量,中元素的個數,在 通道中設置緩衝區的大小。如果容量爲零或不存在,則只有當發送方和接收方都準備好時,通道纔是無緩衝的,並且通信成功。否則,通道被緩衝,並且如果緩衝區不是滿的,則通信成功而不會阻塞 (發送)或不爲空(接收)。一個零通道永遠不會準備好通訊。

您正在通過使用無緩衝通道來節制通道通信。

例如,

generatorChan = make(chan uint64) 
requestChan = make(chan uint64) 
+0

uhmm所以如果我讓我的陳緩衝我應該期望不同的行爲,但事實並非如此。 (http://play.golang.org/p/tq7-6Bc0hL) 仍然沒有得到它對不起:( – Marlon 2015-04-04 20:31:03

+0

@peterSO除了多個工作人員可能都嘗試和發送'requestChan'在同一時間的價值和那麼應該有可能多個生成器可以在「幾乎同時」接收這些個別請求(也就是說,可能有多個生成器在'seq'上競爭,我相信這是個問題) – 2015-04-04 20:32:13

相關問題